You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "mjsax (via GitHub)" <gi...@apache.org> on 2023/05/02 04:10:02 UTC

[GitHub] [kafka] mjsax opened a new pull request, #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904

mjsax opened a new pull request, #13656:
URL: https://github.com/apache/kafka/pull/13656

   Co-Author: @vcrfxia 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13656:
URL: https://github.com/apache/kafka/pull/13656#discussion_r1185618918


##########
streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java:
##########
@@ -106,7 +106,11 @@ private static class ValueList {
         }
 
         int next() {
-            return (index < values.length) ? values[index++] : -1;
+            final int v = values[index++];
+            if (index >= values.length) {
+                index = 0;
+            }

Review Comment:
   Seems the comment is outdated. The custom TS-extractor was removed years ago: https://github.com/apache/kafka/commit/52e397962b624f3c881b6f99e71c94da32cf6a33
   
   Let me delete the comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fqaiser94 commented on a diff in pull request #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904

Posted by "fqaiser94 (via GitHub)" <gi...@apache.org>.
fqaiser94 commented on code in PR #13656:
URL: https://github.com/apache/kafka/pull/13656#discussion_r1190418116


##########
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##########
@@ -236,6 +237,96 @@ def test_rolling_upgrade_with_2_bounces(self, from_version, to_version):
 
         self.stop_and_await()
 
+    @cluster(num_nodes=6)
+    @matrix(from_version=table_agg_versions, to_version=[str(DEV_VERSION)])
+    def test_rolling_upgrade_for_table_agg(self, from_version, to_version):
+        """
+        This test verifies that the cluster successfully upgrades despite changes in the table
+        repartition topic format.
+
+        Starts 3 KafkaStreams instances with version <from_version> and upgrades one-by-one to <to_version>
+        """
+
+        extra_properties = {'test.run_table_agg': 'true'}
+
+        self.set_up_services()
+
+        self.driver.start()
+
+        # encoding different target values for different versions
+        #  - old version: value=A
+        #  - new version with `upgrade_from` flag set: value=B
+        #  - new version w/o `upgrade_from` set set: value=C
+
+        extra_properties = extra_properties.copy()
+        extra_properties['test.agg_produce_value'] = 'A'
+        extra_properties['test.expected_agg_values'] = 'A'
+        self.start_all_nodes_with(from_version, extra_properties)
+
+        counter = 1
+        random.seed()
+
+        # rolling bounce
+        random.shuffle(self.processors)
+        p3 = self.processors[-1]
+        for p in self.processors:
+            p.CLEAN_NODE_ENABLED = False
+
+        # bounce two instances to new version (verifies that new version can process records
+        # written by old version)
+        extra_properties = extra_properties.copy()
+        extra_properties['test.agg_produce_value'] = 'B'
+        extra_properties['test.expected_agg_values'] = 'A,B'
+        for p in self.processors[:-1]:
+            self.do_stop_start_bounce(p, from_version[:-2], to_version, counter, extra_properties)
+            counter = counter + 1
+
+        # bounce remaining instance on old version (just for verification purposes, to verify that
+        # instance on old version can process records written by new version)
+        extra_properties = extra_properties.copy()
+        extra_properties['test.agg_produce_value'] = 'A'
+        extra_properties['test.expected_agg_values'] = 'A,B'
+        self.do_stop_start_bounce(p3, None, from_version, counter, extra_properties)
+        counter = counter + 1
+
+        self.wait_for_table_agg_success('A,B')

Review Comment:
   ```suggestion
           # bounce remaining instance on old version to produce a new unique value
           extra_properties = extra_properties.copy()
           extra_properties['test.agg_produce_value'] = 'C'
           extra_properties['test.expected_agg_values'] = 'A,B,C'
           self.do_stop_start_bounce(p3, None, from_version, counter, extra_properties)
           counter = counter + 1
           
           # verify that new version can process records from old version
           self.wait_for_table_agg_success('A,B,C')
   ```
   
   I think it might be better to have this old (not-upgraded) instance start producing a new value `'C'` when we bounce it here. That way, we can assert using `self.wait_for_table_agg_success('A,B,C')` and be sure that the two upgraded instances have successfully processed messages from the old (not-upgraded) instance as well. 
   
   ![image](https://github.com/apache/kafka/assets/20507243/0f703375-d2b9-44e8-85f1-94a7dc2a6398)
   
   
   (Note, if you accept this change, you will need to make changes below here to produce new unique values like D,E,F, etc.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fqaiser94 commented on a diff in pull request #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904

Posted by "fqaiser94 (via GitHub)" <gi...@apache.org>.
fqaiser94 commented on code in PR #13656:
URL: https://github.com/apache/kafka/pull/13656#discussion_r1190418116


##########
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##########
@@ -236,6 +237,96 @@ def test_rolling_upgrade_with_2_bounces(self, from_version, to_version):
 
         self.stop_and_await()
 
+    @cluster(num_nodes=6)
+    @matrix(from_version=table_agg_versions, to_version=[str(DEV_VERSION)])
+    def test_rolling_upgrade_for_table_agg(self, from_version, to_version):
+        """
+        This test verifies that the cluster successfully upgrades despite changes in the table
+        repartition topic format.
+
+        Starts 3 KafkaStreams instances with version <from_version> and upgrades one-by-one to <to_version>
+        """
+
+        extra_properties = {'test.run_table_agg': 'true'}
+
+        self.set_up_services()
+
+        self.driver.start()
+
+        # encoding different target values for different versions
+        #  - old version: value=A
+        #  - new version with `upgrade_from` flag set: value=B
+        #  - new version w/o `upgrade_from` set set: value=C
+
+        extra_properties = extra_properties.copy()
+        extra_properties['test.agg_produce_value'] = 'A'
+        extra_properties['test.expected_agg_values'] = 'A'
+        self.start_all_nodes_with(from_version, extra_properties)
+
+        counter = 1
+        random.seed()
+
+        # rolling bounce
+        random.shuffle(self.processors)
+        p3 = self.processors[-1]
+        for p in self.processors:
+            p.CLEAN_NODE_ENABLED = False
+
+        # bounce two instances to new version (verifies that new version can process records
+        # written by old version)
+        extra_properties = extra_properties.copy()
+        extra_properties['test.agg_produce_value'] = 'B'
+        extra_properties['test.expected_agg_values'] = 'A,B'
+        for p in self.processors[:-1]:
+            self.do_stop_start_bounce(p, from_version[:-2], to_version, counter, extra_properties)
+            counter = counter + 1
+
+        # bounce remaining instance on old version (just for verification purposes, to verify that
+        # instance on old version can process records written by new version)
+        extra_properties = extra_properties.copy()
+        extra_properties['test.agg_produce_value'] = 'A'
+        extra_properties['test.expected_agg_values'] = 'A,B'
+        self.do_stop_start_bounce(p3, None, from_version, counter, extra_properties)
+        counter = counter + 1
+
+        self.wait_for_table_agg_success('A,B')

Review Comment:
   ```suggestion
           # bounce remaining instance on old version to produce a new unique value
           extra_properties = extra_properties.copy()
           extra_properties['test.agg_produce_value'] = 'C'
           extra_properties['test.expected_agg_values'] = 'A,B,C'
           self.do_stop_start_bounce(p3, None, from_version, counter, extra_properties)
           counter = counter + 1
           
           # verify that new version can process records from old version
           self.wait_for_table_agg_success('A,B,C')
   ```
   
   I think it might be better to have this old (not-upgraded) instance start producing a new value `'C'` when we bounce it here. That way, we can assert using `self.wait_for_table_agg_success('A,B,C')` and be sure that the two upgraded instances have successfully processed messages from the old (not-upgraded) instance as well. 
   
   (Note, if you accept this change, you will need to make changes below here to produce new unique values like D,E,F, etc.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fqaiser94 commented on a diff in pull request #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904

Posted by "fqaiser94 (via GitHub)" <gi...@apache.org>.
fqaiser94 commented on code in PR #13656:
URL: https://github.com/apache/kafka/pull/13656#discussion_r1190412985


##########
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##########
@@ -236,6 +237,96 @@ def test_rolling_upgrade_with_2_bounces(self, from_version, to_version):
 
         self.stop_and_await()
 
+    @cluster(num_nodes=6)
+    @matrix(from_version=table_agg_versions, to_version=[str(DEV_VERSION)])
+    def test_rolling_upgrade_for_table_agg(self, from_version, to_version):
+        """
+        This test verifies that the cluster successfully upgrades despite changes in the table
+        repartition topic format.
+
+        Starts 3 KafkaStreams instances with version <from_version> and upgrades one-by-one to <to_version>
+        """
+
+        extra_properties = {'test.run_table_agg': 'true'}
+
+        self.set_up_services()
+
+        self.driver.start()
+
+        # encoding different target values for different versions
+        #  - old version: value=A
+        #  - new version with `upgrade_from` flag set: value=B
+        #  - new version w/o `upgrade_from` set set: value=C
+
+        extra_properties = extra_properties.copy()
+        extra_properties['test.agg_produce_value'] = 'A'
+        extra_properties['test.expected_agg_values'] = 'A'
+        self.start_all_nodes_with(from_version, extra_properties)
+
+        counter = 1
+        random.seed()
+
+        # rolling bounce
+        random.shuffle(self.processors)
+        p3 = self.processors[-1]
+        for p in self.processors:
+            p.CLEAN_NODE_ENABLED = False
+

Review Comment:
   ```suggestion
   self.wait_for_table_agg_success('A')
   ```
   
   nit: could we add this assertion here? This is more to "set the stage" for readers of the code and make the state transitions happening in the test easier to grasp. My understanding of the situation at this point in the code is as follows: 
   
   ![image](https://github.com/apache/kafka/assets/20507243/dc5c5c90-8f28-42dd-8d38-4ec55bd12e29)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13656:
URL: https://github.com/apache/kafka/pull/13656#discussion_r1182064796


##########
streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java:
##########
@@ -106,7 +106,11 @@ private static class ValueList {
         }
 
         int next() {
-            return (index < values.length) ? values[index++] : -1;

Review Comment:
   The test produces too much data, and thus ended up sending `-1` for a lot of records at the end.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fqaiser94 commented on a diff in pull request #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904

Posted by "fqaiser94 (via GitHub)" <gi...@apache.org>.
fqaiser94 commented on code in PR #13656:
URL: https://github.com/apache/kafka/pull/13656#discussion_r1190412985


##########
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##########
@@ -236,6 +237,96 @@ def test_rolling_upgrade_with_2_bounces(self, from_version, to_version):
 
         self.stop_and_await()
 
+    @cluster(num_nodes=6)
+    @matrix(from_version=table_agg_versions, to_version=[str(DEV_VERSION)])
+    def test_rolling_upgrade_for_table_agg(self, from_version, to_version):
+        """
+        This test verifies that the cluster successfully upgrades despite changes in the table
+        repartition topic format.
+
+        Starts 3 KafkaStreams instances with version <from_version> and upgrades one-by-one to <to_version>
+        """
+
+        extra_properties = {'test.run_table_agg': 'true'}
+
+        self.set_up_services()
+
+        self.driver.start()
+
+        # encoding different target values for different versions
+        #  - old version: value=A
+        #  - new version with `upgrade_from` flag set: value=B
+        #  - new version w/o `upgrade_from` set set: value=C
+
+        extra_properties = extra_properties.copy()
+        extra_properties['test.agg_produce_value'] = 'A'
+        extra_properties['test.expected_agg_values'] = 'A'
+        self.start_all_nodes_with(from_version, extra_properties)
+
+        counter = 1
+        random.seed()
+
+        # rolling bounce
+        random.shuffle(self.processors)
+        p3 = self.processors[-1]
+        for p in self.processors:
+            p.CLEAN_NODE_ENABLED = False
+

Review Comment:
   ```suggestion
   self.wait_for_table_agg_success('A')
   ```
   
   nit: could we add this here? 
   This is would just ensure that at this point all instances have processed a message and their aggregates look like `Agg(List('A'))`. 



##########
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##########
@@ -236,6 +237,96 @@ def test_rolling_upgrade_with_2_bounces(self, from_version, to_version):
 
         self.stop_and_await()
 
+    @cluster(num_nodes=6)
+    @matrix(from_version=table_agg_versions, to_version=[str(DEV_VERSION)])
+    def test_rolling_upgrade_for_table_agg(self, from_version, to_version):
+        """
+        This test verifies that the cluster successfully upgrades despite changes in the table
+        repartition topic format.
+
+        Starts 3 KafkaStreams instances with version <from_version> and upgrades one-by-one to <to_version>
+        """
+
+        extra_properties = {'test.run_table_agg': 'true'}
+
+        self.set_up_services()
+
+        self.driver.start()
+
+        # encoding different target values for different versions
+        #  - old version: value=A
+        #  - new version with `upgrade_from` flag set: value=B
+        #  - new version w/o `upgrade_from` set set: value=C
+
+        extra_properties = extra_properties.copy()
+        extra_properties['test.agg_produce_value'] = 'A'
+        extra_properties['test.expected_agg_values'] = 'A'
+        self.start_all_nodes_with(from_version, extra_properties)
+
+        counter = 1
+        random.seed()
+
+        # rolling bounce
+        random.shuffle(self.processors)
+        p3 = self.processors[-1]
+        for p in self.processors:
+            p.CLEAN_NODE_ENABLED = False
+
+        # bounce two instances to new version (verifies that new version can process records
+        # written by old version)
+        extra_properties = extra_properties.copy()
+        extra_properties['test.agg_produce_value'] = 'B'
+        extra_properties['test.expected_agg_values'] = 'A,B'
+        for p in self.processors[:-1]:
+            self.do_stop_start_bounce(p, from_version[:-2], to_version, counter, extra_properties)
+            counter = counter + 1
+
+        # bounce remaining instance on old version (just for verification purposes, to verify that
+        # instance on old version can process records written by new version)
+        extra_properties = extra_properties.copy()
+        extra_properties['test.agg_produce_value'] = 'A'
+        extra_properties['test.expected_agg_values'] = 'A,B'
+        self.do_stop_start_bounce(p3, None, from_version, counter, extra_properties)
+        counter = counter + 1
+
+        self.wait_for_table_agg_success('A,B')

Review Comment:
   ```suggestion
           # bounce remaining instance on old version to produce a new unique value
           extra_properties = extra_properties.copy()
           extra_properties['test.agg_produce_value'] = 'C'
           extra_properties['test.expected_agg_values'] = 'A,B,C'
           self.do_stop_start_bounce(p3, None, from_version, counter, extra_properties)
           counter = counter + 1
           
           # verify that new version can process records from old version
           self.wait_for_table_agg_success('A,B,C')
   ```
   
   I think it might be better to have this old (not-upgraded) instance start producing a new value when we bounce it here. That way, we can assert using `self.wait_for_table_agg_success('A,B,C')` and be sure that the two upgraded instances have successfully processed messages from the old (not-upgraded) instance as well. 
   
   (Note, if you accept this change, you will need to make changes below here to produce new unique values like D,E,F, etc.)



##########
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##########
@@ -236,6 +237,96 @@ def test_rolling_upgrade_with_2_bounces(self, from_version, to_version):
 
         self.stop_and_await()
 
+    @cluster(num_nodes=6)
+    @matrix(from_version=table_agg_versions, to_version=[str(DEV_VERSION)])
+    def test_rolling_upgrade_for_table_agg(self, from_version, to_version):
+        """
+        This test verifies that the cluster successfully upgrades despite changes in the table
+        repartition topic format.
+
+        Starts 3 KafkaStreams instances with version <from_version> and upgrades one-by-one to <to_version>
+        """
+
+        extra_properties = {'test.run_table_agg': 'true'}
+
+        self.set_up_services()
+
+        self.driver.start()
+
+        # encoding different target values for different versions
+        #  - old version: value=A
+        #  - new version with `upgrade_from` flag set: value=B
+        #  - new version w/o `upgrade_from` set set: value=C
+
+        extra_properties = extra_properties.copy()
+        extra_properties['test.agg_produce_value'] = 'A'
+        extra_properties['test.expected_agg_values'] = 'A'
+        self.start_all_nodes_with(from_version, extra_properties)
+
+        counter = 1
+        random.seed()
+
+        # rolling bounce
+        random.shuffle(self.processors)
+        p3 = self.processors[-1]
+        for p in self.processors:
+            p.CLEAN_NODE_ENABLED = False
+
+        # bounce two instances to new version (verifies that new version can process records
+        # written by old version)
+        extra_properties = extra_properties.copy()
+        extra_properties['test.agg_produce_value'] = 'B'
+        extra_properties['test.expected_agg_values'] = 'A,B'
+        for p in self.processors[:-1]:
+            self.do_stop_start_bounce(p, from_version[:-2], to_version, counter, extra_properties)
+            counter = counter + 1
+

Review Comment:
   ```suggestion
   # verify that old version can process records from new version
   self.wait_for_table_agg_success('A,B')
   ```
   
   Can we just assert this condition here? 
   I don't think we need to bounce the remaining instance before asserting this condition. 



##########
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##########
@@ -236,6 +237,96 @@ def test_rolling_upgrade_with_2_bounces(self, from_version, to_version):
 
         self.stop_and_await()
 
+    @cluster(num_nodes=6)
+    @matrix(from_version=table_agg_versions, to_version=[str(DEV_VERSION)])
+    def test_rolling_upgrade_for_table_agg(self, from_version, to_version):
+        """
+        This test verifies that the cluster successfully upgrades despite changes in the table
+        repartition topic format.
+
+        Starts 3 KafkaStreams instances with version <from_version> and upgrades one-by-one to <to_version>
+        """
+
+        extra_properties = {'test.run_table_agg': 'true'}
+
+        self.set_up_services()
+
+        self.driver.start()
+
+        # encoding different target values for different versions
+        #  - old version: value=A
+        #  - new version with `upgrade_from` flag set: value=B
+        #  - new version w/o `upgrade_from` set set: value=C
+
+        extra_properties = extra_properties.copy()
+        extra_properties['test.agg_produce_value'] = 'A'
+        extra_properties['test.expected_agg_values'] = 'A'
+        self.start_all_nodes_with(from_version, extra_properties)
+
+        counter = 1
+        random.seed()
+
+        # rolling bounce
+        random.shuffle(self.processors)
+        p3 = self.processors[-1]
+        for p in self.processors:
+            p.CLEAN_NODE_ENABLED = False

Review Comment:
   ```suggestion
           random.shuffle(self.processors)
           p3 = self.processors[-1]
           for p in self.processors:
               p.CLEAN_NODE_ENABLED = False
   ```
   
   It seems to me these lines of code have nothing to do with executing a "rolling bounce"?



##########
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##########
@@ -236,6 +237,96 @@ def test_rolling_upgrade_with_2_bounces(self, from_version, to_version):
 
         self.stop_and_await()
 
+    @cluster(num_nodes=6)
+    @matrix(from_version=table_agg_versions, to_version=[str(DEV_VERSION)])
+    def test_rolling_upgrade_for_table_agg(self, from_version, to_version):
+        """
+        This test verifies that the cluster successfully upgrades despite changes in the table
+        repartition topic format.
+
+        Starts 3 KafkaStreams instances with version <from_version> and upgrades one-by-one to <to_version>
+        """
+
+        extra_properties = {'test.run_table_agg': 'true'}
+
+        self.set_up_services()
+
+        self.driver.start()
+
+        # encoding different target values for different versions
+        #  - old version: value=A
+        #  - new version with `upgrade_from` flag set: value=B
+        #  - new version w/o `upgrade_from` set set: value=C

Review Comment:
   ```suggestion
           #  - new version w/o `upgrade_from` flag set: value=C
   ```



##########
streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java:
##########
@@ -143,6 +173,94 @@ private static void buildFKTable(final KStream<String, Integer> primaryTable,
         kStream.to("fk-result", Produced.with(stringSerde, stringSerde));
     }
 
+    private static void buildTableAgg(final KTable<String, Integer> sourceTable,
+                                      final String aggProduceValue,
+                                      final List<String> expectedAggValues) {
+        final KStream<Integer, String> result = sourceTable
+            .groupBy(
+                (k, v) -> new KeyValue<>(v, aggProduceValue),
+                Grouped.with(intSerde, stringSerde))
+            .aggregate(
+                () -> new Agg(Collections.emptyList(), 0),
+                (k, v, agg) -> {
+                    final List<String> seenValues;
+                    final boolean updated;
+                    if (!agg.seenValues.contains(v)) {
+                        seenValues = new ArrayList<>(agg.seenValues);
+                        seenValues.add(v);
+                        Collections.sort(seenValues);

Review Comment:
   Is there a reason we **_need_** to sort? 



##########
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##########
@@ -236,6 +237,96 @@ def test_rolling_upgrade_with_2_bounces(self, from_version, to_version):
 
         self.stop_and_await()
 
+    @cluster(num_nodes=6)
+    @matrix(from_version=table_agg_versions, to_version=[str(DEV_VERSION)])
+    def test_rolling_upgrade_for_table_agg(self, from_version, to_version):
+        """
+        This test verifies that the cluster successfully upgrades despite changes in the table
+        repartition topic format.
+
+        Starts 3 KafkaStreams instances with version <from_version> and upgrades one-by-one to <to_version>
+        """
+
+        extra_properties = {'test.run_table_agg': 'true'}
+
+        self.set_up_services()
+
+        self.driver.start()
+
+        # encoding different target values for different versions
+        #  - old version: value=A
+        #  - new version with `upgrade_from` flag set: value=B
+        #  - new version w/o `upgrade_from` set set: value=C
+
+        extra_properties = extra_properties.copy()
+        extra_properties['test.agg_produce_value'] = 'A'
+        extra_properties['test.expected_agg_values'] = 'A'
+        self.start_all_nodes_with(from_version, extra_properties)
+
+        counter = 1
+        random.seed()
+
+        # rolling bounce
+        random.shuffle(self.processors)
+        p3 = self.processors[-1]
+        for p in self.processors:
+            p.CLEAN_NODE_ENABLED = False
+
+        # bounce two instances to new version (verifies that new version can process records
+        # written by old version)
+        extra_properties = extra_properties.copy()
+        extra_properties['test.agg_produce_value'] = 'B'
+        extra_properties['test.expected_agg_values'] = 'A,B'
+        for p in self.processors[:-1]:
+            self.do_stop_start_bounce(p, from_version[:-2], to_version, counter, extra_properties)
+            counter = counter + 1
+
+        # bounce remaining instance on old version (just for verification purposes, to verify that
+        # instance on old version can process records written by new version)
+        extra_properties = extra_properties.copy()
+        extra_properties['test.agg_produce_value'] = 'A'
+        extra_properties['test.expected_agg_values'] = 'A,B'
+        self.do_stop_start_bounce(p3, None, from_version, counter, extra_properties)
+        counter = counter + 1
+
+        self.wait_for_table_agg_success('A,B')
+
+        # bounce remaining instance to new version (verifies that new version without upgrade_from
+        # can process records written by new version with upgrade_from)
+        extra_properties = extra_properties.copy()
+        extra_properties['test.agg_produce_value'] = 'C'
+        extra_properties['test.expected_agg_values'] = 'A,B,C'
+        self.do_stop_start_bounce(p3, None, to_version, counter, extra_properties)
+        counter = counter + 1
+
+        # bounce first instances again without removing upgrade_from (just for verification purposes,
+        # to verify that instance with upgrade_from can process records written without upgrade_from)
+        extra_properties = extra_properties.copy()
+        extra_properties['test.agg_produce_value'] = 'B'
+        extra_properties['test.expected_agg_values'] = 'A,B,C'
+        for p in self.processors[:-1]:
+            self.do_stop_start_bounce(p, from_version[:-2], to_version, counter, extra_properties)
+            counter = counter + 1
+
+        self.wait_for_table_agg_success('A,B,C')
+
+        self.stop_and_await()
+
+    def wait_for_table_agg_success(self, expected_values):
+        agg_success_str = "Table aggregate processor saw expected values. Seen: " + expected_values

Review Comment:
   Surprised you don't need to specify the full string? I guess we just check for any lines beginning with `agg_success_str` rather than lines that are an exact match with `agg_success_str`. 
   
   https://github.com/apache/kafka/blob/a70e6ba464cc6ef2196a7c2e419a308b627f41f1/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java#L201



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fqaiser94 commented on a diff in pull request #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904

Posted by "fqaiser94 (via GitHub)" <gi...@apache.org>.
fqaiser94 commented on code in PR #13656:
URL: https://github.com/apache/kafka/pull/13656#discussion_r1190413444


##########
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##########
@@ -236,6 +237,96 @@ def test_rolling_upgrade_with_2_bounces(self, from_version, to_version):
 
         self.stop_and_await()
 
+    @cluster(num_nodes=6)
+    @matrix(from_version=table_agg_versions, to_version=[str(DEV_VERSION)])
+    def test_rolling_upgrade_for_table_agg(self, from_version, to_version):
+        """
+        This test verifies that the cluster successfully upgrades despite changes in the table
+        repartition topic format.
+
+        Starts 3 KafkaStreams instances with version <from_version> and upgrades one-by-one to <to_version>
+        """
+
+        extra_properties = {'test.run_table_agg': 'true'}
+
+        self.set_up_services()
+
+        self.driver.start()
+
+        # encoding different target values for different versions
+        #  - old version: value=A
+        #  - new version with `upgrade_from` flag set: value=B
+        #  - new version w/o `upgrade_from` set set: value=C
+
+        extra_properties = extra_properties.copy()
+        extra_properties['test.agg_produce_value'] = 'A'
+        extra_properties['test.expected_agg_values'] = 'A'
+        self.start_all_nodes_with(from_version, extra_properties)
+
+        counter = 1
+        random.seed()
+
+        # rolling bounce
+        random.shuffle(self.processors)
+        p3 = self.processors[-1]
+        for p in self.processors:
+            p.CLEAN_NODE_ENABLED = False
+
+        # bounce two instances to new version (verifies that new version can process records
+        # written by old version)
+        extra_properties = extra_properties.copy()
+        extra_properties['test.agg_produce_value'] = 'B'
+        extra_properties['test.expected_agg_values'] = 'A,B'
+        for p in self.processors[:-1]:
+            self.do_stop_start_bounce(p, from_version[:-2], to_version, counter, extra_properties)
+            counter = counter + 1
+

Review Comment:
   ```suggestion
   # verify that old version can process records from new version
   self.wait_for_table_agg_success('A,B')
   ```
   
   Can we assert this condition earlier, like here? 
   I don't think we need to bounce the remaining instance before asserting this condition. 
   
   Here's how I'm thinking about it, if it helps: 
   ![image](https://github.com/apache/kafka/assets/20507243/7277a4df-0c93-4a42-8f7c-4d7adb02f6a6)
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fqaiser94 commented on a diff in pull request #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904

Posted by "fqaiser94 (via GitHub)" <gi...@apache.org>.
fqaiser94 commented on code in PR #13656:
URL: https://github.com/apache/kafka/pull/13656#discussion_r1190413444


##########
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##########
@@ -236,6 +237,96 @@ def test_rolling_upgrade_with_2_bounces(self, from_version, to_version):
 
         self.stop_and_await()
 
+    @cluster(num_nodes=6)
+    @matrix(from_version=table_agg_versions, to_version=[str(DEV_VERSION)])
+    def test_rolling_upgrade_for_table_agg(self, from_version, to_version):
+        """
+        This test verifies that the cluster successfully upgrades despite changes in the table
+        repartition topic format.
+
+        Starts 3 KafkaStreams instances with version <from_version> and upgrades one-by-one to <to_version>
+        """
+
+        extra_properties = {'test.run_table_agg': 'true'}
+
+        self.set_up_services()
+
+        self.driver.start()
+
+        # encoding different target values for different versions
+        #  - old version: value=A
+        #  - new version with `upgrade_from` flag set: value=B
+        #  - new version w/o `upgrade_from` set set: value=C
+
+        extra_properties = extra_properties.copy()
+        extra_properties['test.agg_produce_value'] = 'A'
+        extra_properties['test.expected_agg_values'] = 'A'
+        self.start_all_nodes_with(from_version, extra_properties)
+
+        counter = 1
+        random.seed()
+
+        # rolling bounce
+        random.shuffle(self.processors)
+        p3 = self.processors[-1]
+        for p in self.processors:
+            p.CLEAN_NODE_ENABLED = False
+
+        # bounce two instances to new version (verifies that new version can process records
+        # written by old version)
+        extra_properties = extra_properties.copy()
+        extra_properties['test.agg_produce_value'] = 'B'
+        extra_properties['test.expected_agg_values'] = 'A,B'
+        for p in self.processors[:-1]:
+            self.do_stop_start_bounce(p, from_version[:-2], to_version, counter, extra_properties)
+            counter = counter + 1
+

Review Comment:
   ```suggestion
   # verify that old version can process records from new version
   self.wait_for_table_agg_success('A,B')
   ```
   
   Can we assert this condition earlier, like here? 
   I don't think we need to bounce the remaining instance before asserting this condition. 
   
   Here's how I'm thinking about it: 
   ![image](https://github.com/apache/kafka/assets/20507243/7277a4df-0c93-4a42-8f7c-4d7adb02f6a6)
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13656:
URL: https://github.com/apache/kafka/pull/13656#discussion_r1183045333


##########
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##########
@@ -41,6 +41,7 @@
 metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)]
 fk_join_versions = [str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), str(LATEST_2_7), str(LATEST_2_8), 
                     str(LATEST_3_0), str(LATEST_3_1), str(LATEST_3_2), str(LATEST_3_3)]
+table_agg_versions = [str(LATEST_3_3)]

Review Comment:
   Guess it should not be different -- but in the past, we basically tested all versions -- if we think it's too excessive, we can also cut down the test matrix in general.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13656:
URL: https://github.com/apache/kafka/pull/13656#discussion_r1183072852


##########
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##########
@@ -41,6 +41,7 @@
 metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)]
 fk_join_versions = [str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), str(LATEST_2_7), str(LATEST_2_8), 
                     str(LATEST_3_0), str(LATEST_3_1), str(LATEST_3_2), str(LATEST_3_3)]
+table_agg_versions = [str(LATEST_3_3)]

Review Comment:
   You have a better sense of how long these tests take to run vs how much additional value testing multiple older versions gives, so I trust your judgment. My instinct would've been to say that since this is a new test, we don't need to add in the older versions unless we expect them to be different, but no strong preference :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13656:
URL: https://github.com/apache/kafka/pull/13656#discussion_r1182064875


##########
streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java:
##########
@@ -126,7 +130,7 @@ static void generatePerpetually(final String kafka,
             data[i] = new ValueList(i, i + maxRecordsPerKey - 1);
         }
 
-        final Random rand = new Random();
+        final Random rand = new Random(System.currentTimeMillis());

Review Comment:
   Minor side improvement



##########
streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java:
##########
@@ -143,6 +173,94 @@ private static void buildFKTable(final KStream<String, Integer> primaryTable,
         kStream.to("fk-result", Produced.with(stringSerde, stringSerde));
     }
 
+    private static void buildTableAgg(final KTable<String, Integer> sourceTable,
+                                      final String aggProduceValue,
+                                      final List<String> expectedAggValues) {
+        final KStream<Integer, String> result = sourceTable
+            .groupBy(
+                (k, v) -> new KeyValue<>(v, aggProduceValue),

Review Comment:
   Changed this to use `v` as key -- works just fine



##########
streams/upgrade-system-tests-33/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java:
##########
@@ -60,6 +71,9 @@ public static void main(final String[] args) throws Exception {
         final boolean runFkJoin = Boolean.parseBoolean(streamsProperties.getProperty(
             "test.run_fk_join",
             "false"));
+        final boolean runTableAgg = Boolean.parseBoolean(streamsProperties.getProperty(

Review Comment:
   Backported the table-aggregation step to older versions -- without it, the first app instances we start up don't have it.
   
   This must be done for other older versions we want to test, too.



##########
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##########
@@ -41,6 +41,7 @@
 metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)]
 fk_join_versions = [str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), str(LATEST_2_7), str(LATEST_2_8), 
                     str(LATEST_3_0), str(LATEST_3_1), str(LATEST_3_2), str(LATEST_3_3)]
+table_agg_versions = [str(LATEST_3_3)]

Review Comment:
   We should add more versions here -- not sure how far back we want to go?



##########
streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java:
##########
@@ -143,6 +173,94 @@ private static void buildFKTable(final KStream<String, Integer> primaryTable,
         kStream.to("fk-result", Produced.with(stringSerde, stringSerde));
     }
 
+    private static void buildTableAgg(final KTable<String, Integer> sourceTable,
+                                      final String aggProduceValue,
+                                      final List<String> expectedAggValues) {
+        final KStream<Integer, String> result = sourceTable
+            .groupBy(
+                (k, v) -> new KeyValue<>(v, aggProduceValue),
+                Grouped.with(intSerde, stringSerde))
+            .aggregate(
+                () -> new Agg(Collections.emptyList(), 0),
+                (k, v, agg) -> {
+                    final List<String> seenValues;
+                    final boolean updated;
+                    if (!agg.seenValues.contains(v)) {
+                        seenValues = new ArrayList<>(agg.seenValues);
+                        seenValues.add(v);
+                        Collections.sort(seenValues);
+                        updated = true;
+                    } else {
+                        seenValues = agg.seenValues;
+                        updated = false;
+                    }
+
+                    final boolean shouldLog = updated || (agg.recordsProcessed % 10 == 0); // value of 10 is chosen for debugging purposes. can increase to 100 once test is passing.
+                    if (shouldLog) {

Review Comment:
   Changed this slightly to avoid spamming the output



##########
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##########
@@ -175,7 +176,7 @@ def test_upgrade_downgrade_brokers(self, from_version, to_version):
                     self.perform_broker_upgrade(to_version)
 
                     log_monitor.wait_until(connected_message,
-                                           timeout_sec=120,
+                                           timeout_sec=60,

Review Comment:
   Not sure why this timeout was higher than all others. Side cleanup



##########
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##########
@@ -236,6 +237,96 @@ def test_rolling_upgrade_with_2_bounces(self, from_version, to_version):
 
         self.stop_and_await()
 
+    @cluster(num_nodes=6)
+    @matrix(from_version=table_agg_versions, to_version=[str(DEV_VERSION)])
+    def test_rolling_upgrade_for_table_agg(self, from_version, to_version):
+        """
+        This test verifies that the cluster successfully upgrades despite changes in the table
+        repartition topic format.
+
+        Starts 3 KafkaStreams instances with version <from_version> and upgrades one-by-one to <to_version>
+        """
+
+        extra_properties = {'test.run_table_agg': 'true'}
+
+        self.set_up_services()
+
+        self.driver.start()
+
+        # encoding different target values for different versions
+        #  - old version: value=A
+        #  - new version with `upgrade_from` flag set: value=B
+        #  - new version w/o `upgrade_from` set set: value=C
+
+        extra_properties = extra_properties.copy()
+        extra_properties['test.agg_produce_value'] = 'A'
+        extra_properties['test.expected_agg_values'] = 'A'
+        self.start_all_nodes_with(from_version, extra_properties)
+
+        counter = 1
+        random.seed()
+
+        # rolling bounce
+        random.shuffle(self.processors)
+        p3 = self.processors[-1]
+        for p in self.processors:
+            p.CLEAN_NODE_ENABLED = False
+
+        # bounce two instances to new version (verifies that new version can process records
+        # written by old version)
+        extra_properties = extra_properties.copy()
+        extra_properties['test.agg_produce_value'] = 'B'
+        extra_properties['test.expected_agg_values'] = 'A,B'
+        for p in self.processors[:-1]:
+            self.do_stop_start_bounce(p, from_version[:-2], to_version, counter, extra_properties)
+            counter = counter + 1
+
+        # bounce remaining instance on old version (just for verification purposes, to verify that
+        # instance on old version can process records written by new version)
+        extra_properties = extra_properties.copy()
+        extra_properties['test.agg_produce_value'] = 'A'
+        extra_properties['test.expected_agg_values'] = 'A,B'
+        self.do_stop_start_bounce(p3, None, from_version, counter, extra_properties)
+        counter = counter + 1
+
+        self.wait_for_table_agg_success('A,B')
+
+        # bounce remaining instance to new version (verifies that new version without upgrade_from
+        # can process records written by new version with upgrade_from)
+        extra_properties = extra_properties.copy()
+        extra_properties['test.agg_produce_value'] = 'C'
+        extra_properties['test.expected_agg_values'] = 'A,B,C'
+        self.do_stop_start_bounce(p3, None, to_version, counter, extra_properties)
+        counter = counter + 1
+
+        # bounce first instances again without removing upgrade_from (just for verification purposes,
+        # to verify that instance with upgrade_from can process records written without upgrade_from)
+        extra_properties = extra_properties.copy()
+        extra_properties['test.agg_produce_value'] = 'B'
+        extra_properties['test.expected_agg_values'] = 'A,B,C'
+        for p in self.processors[:-1]:
+            self.do_stop_start_bounce(p, from_version[:-2], to_version, counter, extra_properties)
+            counter = counter + 1
+
+        self.wait_for_table_agg_success('A,B,C')
+
+        self.stop_and_await()
+
+    def wait_for_table_agg_success(self, expected_values):
+        agg_success_str = "Table aggregate processor saw expected values. Seen: " + expected_values
+        with self.processor1.node.account.monitor_log(self.processor1.STDOUT_FILE) as first_monitor:
+            with self.processor2.node.account.monitor_log(self.processor2.STDOUT_FILE) as second_monitor:
+                with self.processor3.node.account.monitor_log(self.processor3.STDOUT_FILE) as third_monitor:

Review Comment:
   All three monitors need to tail STDOUT (not the LOG files)



##########
streams/upgrade-system-tests-33/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java:
##########
@@ -69,14 +83,32 @@ public static void main(final String[] args) throws Exception {
                 System.err.println("Caught " + e.getMessage());
             }
         }
+        if (runTableAgg) {
+            final String aggProduceValue = streamsProperties.getProperty("test.agg_produce_value", "");
+            if (aggProduceValue.isEmpty()) {
+                System.err.printf("'%s' must be specified when '%s' is true.", "test.agg_produce_value", "test.run_table_agg");
+            }
+            final String expectedAggValuesStr = streamsProperties.getProperty("test.expected_agg_values", "");
+            if (expectedAggValuesStr.isEmpty()) {
+                System.err.printf("'%s' must be specified when '%s' is true.", "test.expected_agg_values", "test.run_table_agg");
+            }
+            final List<String> expectedAggValues = Arrays.asList(expectedAggValuesStr.split(","));
+
+            try {
+                buildTableAgg(dataTable, aggProduceValue, expectedAggValues);
+            } catch (final Exception e) {
+                System.err.println("Caught " + e.getMessage());
+            }
+        }
 
         final Properties config = new Properties();
         config.setProperty(
             StreamsConfig.APPLICATION_ID_CONFIG,
-            "StreamsUpgradeTest-" + new Random().nextLong());
+            "StreamsUpgradeTest");

Review Comment:
   Fix application.id (cf https://github.com/apache/kafka/pull/13654)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13656:
URL: https://github.com/apache/kafka/pull/13656#discussion_r1185621879


##########
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##########
@@ -236,6 +237,96 @@ def test_rolling_upgrade_with_2_bounces(self, from_version, to_version):
 
         self.stop_and_await()
 
+    @cluster(num_nodes=6)
+    @matrix(from_version=table_agg_versions, to_version=[str(DEV_VERSION)])
+    def test_rolling_upgrade_for_table_agg(self, from_version, to_version):
+        """
+        This test verifies that the cluster successfully upgrades despite changes in the table
+        repartition topic format.
+
+        Starts 3 KafkaStreams instances with version <from_version> and upgrades one-by-one to <to_version>
+        """
+
+        extra_properties = {'test.run_table_agg': 'true'}
+
+        self.set_up_services()
+
+        self.driver.start()
+
+        # encoding different target values for different versions
+        #  - old version: value=A
+        #  - new version with `upgrade_from` flag set: value=B
+        #  - new version w/o `upgrade_from` set set: value=C
+
+        extra_properties = extra_properties.copy()
+        extra_properties['test.agg_produce_value'] = 'A'
+        extra_properties['test.expected_agg_values'] = 'A'
+        self.start_all_nodes_with(from_version, extra_properties)
+
+        counter = 1
+        random.seed()
+
+        # rolling bounce
+        random.shuffle(self.processors)
+        p3 = self.processors[-1]
+        for p in self.processors:
+            p.CLEAN_NODE_ENABLED = False
+
+        # bounce two instances to new version (verifies that new version can process records
+        # written by old version)
+        extra_properties = extra_properties.copy()
+        extra_properties['test.agg_produce_value'] = 'B'
+        extra_properties['test.expected_agg_values'] = 'A,B'
+        for p in self.processors[:-1]:
+            self.do_stop_start_bounce(p, from_version[:-2], to_version, counter, extra_properties)

Review Comment:
   Yes, `[n:m]` returns a sub-string (`from_version` is a string), from index `n` to `m` (`m` exclusive). If you omit `n` it's from beginning, if you omit `m` it's to the end.
   
   A negative index is "from backwards", so here we get "from beginning" to "second last", ie, we cut of the last to chars, ie, the `.x` bug-fix suffix.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13656:
URL: https://github.com/apache/kafka/pull/13656#discussion_r1185622586


##########
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##########
@@ -236,6 +237,96 @@ def test_rolling_upgrade_with_2_bounces(self, from_version, to_version):
 
         self.stop_and_await()
 
+    @cluster(num_nodes=6)
+    @matrix(from_version=table_agg_versions, to_version=[str(DEV_VERSION)])
+    def test_rolling_upgrade_for_table_agg(self, from_version, to_version):
+        """
+        This test verifies that the cluster successfully upgrades despite changes in the table
+        repartition topic format.
+
+        Starts 3 KafkaStreams instances with version <from_version> and upgrades one-by-one to <to_version>
+        """
+
+        extra_properties = {'test.run_table_agg': 'true'}
+
+        self.set_up_services()
+
+        self.driver.start()
+
+        # encoding different target values for different versions
+        #  - old version: value=A
+        #  - new version with `upgrade_from` flag set: value=B
+        #  - new version w/o `upgrade_from` set set: value=C
+
+        extra_properties = extra_properties.copy()
+        extra_properties['test.agg_produce_value'] = 'A'
+        extra_properties['test.expected_agg_values'] = 'A'
+        self.start_all_nodes_with(from_version, extra_properties)
+
+        counter = 1
+        random.seed()
+
+        # rolling bounce
+        random.shuffle(self.processors)
+        p3 = self.processors[-1]
+        for p in self.processors:
+            p.CLEAN_NODE_ENABLED = False
+
+        # bounce two instances to new version (verifies that new version can process records
+        # written by old version)

Review Comment:
   We have a guarantee. Inside `do_stop_start_bounce` we wait until the processor prints that it has processed records.
   
   Not sure if we want to tighten this check, but I think it should be ok?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13656:
URL: https://github.com/apache/kafka/pull/13656#discussion_r1183071555


##########
streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java:
##########
@@ -143,6 +173,94 @@ private static void buildFKTable(final KStream<String, Integer> primaryTable,
         kStream.to("fk-result", Produced.with(stringSerde, stringSerde));
     }
 
+    private static void buildTableAgg(final KTable<String, Integer> sourceTable,
+                                      final String aggProduceValue,
+                                      final List<String> expectedAggValues) {
+        final KStream<Integer, String> result = sourceTable
+            .groupBy(
+                (k, v) -> new KeyValue<>(v, aggProduceValue),
+                Grouped.with(intSerde, stringSerde))
+            .aggregate(
+                () -> new Agg(Collections.emptyList(), 0),
+                (k, v, agg) -> {
+                    final List<String> seenValues;
+                    final boolean updated;
+                    if (!agg.seenValues.contains(v)) {
+                        seenValues = new ArrayList<>(agg.seenValues);
+                        seenValues.add(v);
+                        Collections.sort(seenValues);
+                        updated = true;
+                    } else {
+                        seenValues = agg.seenValues;
+                        updated = false;
+                    }
+
+                    final boolean shouldLog = updated || (agg.recordsProcessed % 10 == 0); // value of 10 is chosen for debugging purposes. can increase to 100 once test is passing.
+                    if (shouldLog) {

Review Comment:
   Ah, must've put that in for debugging and forgotten to leave a note. The new mechanism makes sense 👍 If you think `recordsProcessed % 10` is the right frequency, rather than `recordsProcessed % 100`, then perhaps we can remove the comment entirely and just leave it as `recordsProcessed % 10`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fqaiser94 commented on a diff in pull request #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904

Posted by "fqaiser94 (via GitHub)" <gi...@apache.org>.
fqaiser94 commented on code in PR #13656:
URL: https://github.com/apache/kafka/pull/13656#discussion_r1190413444


##########
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##########
@@ -236,6 +237,96 @@ def test_rolling_upgrade_with_2_bounces(self, from_version, to_version):
 
         self.stop_and_await()
 
+    @cluster(num_nodes=6)
+    @matrix(from_version=table_agg_versions, to_version=[str(DEV_VERSION)])
+    def test_rolling_upgrade_for_table_agg(self, from_version, to_version):
+        """
+        This test verifies that the cluster successfully upgrades despite changes in the table
+        repartition topic format.
+
+        Starts 3 KafkaStreams instances with version <from_version> and upgrades one-by-one to <to_version>
+        """
+
+        extra_properties = {'test.run_table_agg': 'true'}
+
+        self.set_up_services()
+
+        self.driver.start()
+
+        # encoding different target values for different versions
+        #  - old version: value=A
+        #  - new version with `upgrade_from` flag set: value=B
+        #  - new version w/o `upgrade_from` set set: value=C
+
+        extra_properties = extra_properties.copy()
+        extra_properties['test.agg_produce_value'] = 'A'
+        extra_properties['test.expected_agg_values'] = 'A'
+        self.start_all_nodes_with(from_version, extra_properties)
+
+        counter = 1
+        random.seed()
+
+        # rolling bounce
+        random.shuffle(self.processors)
+        p3 = self.processors[-1]
+        for p in self.processors:
+            p.CLEAN_NODE_ENABLED = False
+
+        # bounce two instances to new version (verifies that new version can process records
+        # written by old version)
+        extra_properties = extra_properties.copy()
+        extra_properties['test.agg_produce_value'] = 'B'
+        extra_properties['test.expected_agg_values'] = 'A,B'
+        for p in self.processors[:-1]:
+            self.do_stop_start_bounce(p, from_version[:-2], to_version, counter, extra_properties)
+            counter = counter + 1
+

Review Comment:
   ```suggestion
   # verify that old version can process records from new version
   self.wait_for_table_agg_success('A,B')
   ```
   
   Can we assert this condition earlier, like here? 
   I don't think we need to bounce the remaining instance before asserting this condition. 
   
   ![image](https://github.com/apache/kafka/assets/20507243/05ebd2fe-3192-47ea-9d25-9da337919a01)
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13656:
URL: https://github.com/apache/kafka/pull/13656#discussion_r1183023137


##########
streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java:
##########
@@ -34,10 +34,34 @@ public class SmokeTestUtil {
 
     final static int END = Integer.MAX_VALUE;
 
+    static ProcessorSupplier<Object, Object, Void, Void> printTaskProcessorSupplier(final String topic) {
+        return printTaskProcessorSupplier(topic, "");
+    }
+
     static ProcessorSupplier<Object, Object, Void, Void> printProcessorSupplier(final String topic) {
         return printProcessorSupplier(topic, "");
     }
 
+    static ProcessorSupplier<Object, Object, Void, Void> printTaskProcessorSupplier(final String topic, final String name) {

Review Comment:
   nit: this new processor is the same as the existing one except that it doesn't track or print the number of records processed, right? Would it be better to have a boolean to toggle the print behavior, rather than duplicating the rest of the processor code? (Not a big deal either way since it's not much code, but as a reader I had to spent some time determining/verifying that the print behavior is the only difference.)



##########
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##########
@@ -41,6 +41,7 @@
 metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)]
 fk_join_versions = [str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), str(LATEST_2_7), str(LATEST_2_8), 
                     str(LATEST_3_0), str(LATEST_3_1), str(LATEST_3_2), str(LATEST_3_3)]
+table_agg_versions = [str(LATEST_3_3)]

Review Comment:
   What's the reason for adding older versions? Do we expect that upgrading from a version older than 3.3 will be different than updating from 3.3?



##########
streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java:
##########
@@ -143,6 +173,94 @@ private static void buildFKTable(final KStream<String, Integer> primaryTable,
         kStream.to("fk-result", Produced.with(stringSerde, stringSerde));
     }
 
+    private static void buildTableAgg(final KTable<String, Integer> sourceTable,
+                                      final String aggProduceValue,
+                                      final List<String> expectedAggValues) {
+        final KStream<Integer, String> result = sourceTable
+            .groupBy(
+                (k, v) -> new KeyValue<>(v, aggProduceValue),
+                Grouped.with(intSerde, stringSerde))
+            .aggregate(
+                () -> new Agg(Collections.emptyList(), 0),
+                (k, v, agg) -> {
+                    final List<String> seenValues;
+                    final boolean updated;
+                    if (!agg.seenValues.contains(v)) {
+                        seenValues = new ArrayList<>(agg.seenValues);
+                        seenValues.add(v);
+                        Collections.sort(seenValues);
+                        updated = true;
+                    } else {
+                        seenValues = agg.seenValues;
+                        updated = false;
+                    }
+
+                    final boolean shouldLog = updated || (agg.recordsProcessed % 10 == 0); // value of 10 is chosen for debugging purposes. can increase to 100 once test is passing.
+                    if (shouldLog) {

Review Comment:
   Hmm I'm not seeing what the change was. Should we increase the value in the line above from 10 too 100? Currently the comment still says "value of 10 is chosen for debugging purposes. can increase to 100 once test is passing"



##########
streams/upgrade-system-tests-33/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java:
##########
@@ -60,6 +71,9 @@ public static void main(final String[] args) throws Exception {
         final boolean runFkJoin = Boolean.parseBoolean(streamsProperties.getProperty(
             "test.run_fk_join",
             "false"));
+        final boolean runTableAgg = Boolean.parseBoolean(streamsProperties.getProperty(

Review Comment:
   Doh! This is the step I was missing when I was testing these test changes earlier. Thanks for solving my mystery :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13656:
URL: https://github.com/apache/kafka/pull/13656#discussion_r1187159604


##########
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##########
@@ -236,6 +237,96 @@ def test_rolling_upgrade_with_2_bounces(self, from_version, to_version):
 
         self.stop_and_await()
 
+    @cluster(num_nodes=6)
+    @matrix(from_version=table_agg_versions, to_version=[str(DEV_VERSION)])
+    def test_rolling_upgrade_for_table_agg(self, from_version, to_version):
+        """
+        This test verifies that the cluster successfully upgrades despite changes in the table
+        repartition topic format.
+
+        Starts 3 KafkaStreams instances with version <from_version> and upgrades one-by-one to <to_version>
+        """
+
+        extra_properties = {'test.run_table_agg': 'true'}
+
+        self.set_up_services()
+
+        self.driver.start()
+
+        # encoding different target values for different versions
+        #  - old version: value=A
+        #  - new version with `upgrade_from` flag set: value=B
+        #  - new version w/o `upgrade_from` set set: value=C
+
+        extra_properties = extra_properties.copy()
+        extra_properties['test.agg_produce_value'] = 'A'
+        extra_properties['test.expected_agg_values'] = 'A'
+        self.start_all_nodes_with(from_version, extra_properties)
+
+        counter = 1
+        random.seed()
+
+        # rolling bounce
+        random.shuffle(self.processors)
+        p3 = self.processors[-1]
+        for p in self.processors:
+            p.CLEAN_NODE_ENABLED = False
+
+        # bounce two instances to new version (verifies that new version can process records
+        # written by old version)
+        extra_properties = extra_properties.copy()
+        extra_properties['test.agg_produce_value'] = 'B'
+        extra_properties['test.expected_agg_values'] = 'A,B'
+        for p in self.processors[:-1]:
+            self.do_stop_start_bounce(p, from_version[:-2], to_version, counter, extra_properties)

Review Comment:
   Ah, I misinterpreted the code. I thought, the whole list of from_versions is passed into the function. Now I see that it is just one version, obviously. My fault, sorry! 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13656:
URL: https://github.com/apache/kafka/pull/13656#discussion_r1187176080


##########
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##########
@@ -236,6 +237,96 @@ def test_rolling_upgrade_with_2_bounces(self, from_version, to_version):
 
         self.stop_and_await()
 
+    @cluster(num_nodes=6)
+    @matrix(from_version=table_agg_versions, to_version=[str(DEV_VERSION)])
+    def test_rolling_upgrade_for_table_agg(self, from_version, to_version):
+        """
+        This test verifies that the cluster successfully upgrades despite changes in the table
+        repartition topic format.
+
+        Starts 3 KafkaStreams instances with version <from_version> and upgrades one-by-one to <to_version>
+        """
+
+        extra_properties = {'test.run_table_agg': 'true'}
+
+        self.set_up_services()
+
+        self.driver.start()
+
+        # encoding different target values for different versions
+        #  - old version: value=A
+        #  - new version with `upgrade_from` flag set: value=B
+        #  - new version w/o `upgrade_from` set set: value=C
+
+        extra_properties = extra_properties.copy()
+        extra_properties['test.agg_produce_value'] = 'A'
+        extra_properties['test.expected_agg_values'] = 'A'
+        self.start_all_nodes_with(from_version, extra_properties)
+
+        counter = 1
+        random.seed()
+
+        # rolling bounce
+        random.shuffle(self.processors)
+        p3 = self.processors[-1]
+        for p in self.processors:
+            p.CLEAN_NODE_ENABLED = False
+
+        # bounce two instances to new version (verifies that new version can process records
+        # written by old version)

Review Comment:
   I see the check, but it does not guarantee that the instance that was not rolled had enough time to write records in the old format to partitions that will be read by the rolled instances. Also it does not guarantee that the records that might have been written by the not rolled instance to partitions that will be read by the rolled instances has not been already consumed by the not rolled instance itself before the rolled instances start processing. Similar is true for the subsequent rolls.
   Does this make sense or do I miss something?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] github-actions[bot] commented on pull request #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #13656:
URL: https://github.com/apache/kafka/pull/13656#issuecomment-1670601383

   This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has  merge conflicts, please update it with the latest from trunk (or appropriate release branch) <p> If this PR is no longer valid or desired, please feel free to close it. If no activity occurrs in the next 30 days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13656:
URL: https://github.com/apache/kafka/pull/13656#discussion_r1183390548


##########
streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java:
##########
@@ -106,7 +106,11 @@ private static class ValueList {
         }
 
         int next() {
-            return (index < values.length) ? values[index++] : -1;
+            final int v = values[index++];
+            if (index >= values.length) {
+                index = 0;
+            }

Review Comment:
   Doesn't this risk to bring a lot of disorder into the timestamps? I am referring to the comment on line 100. What are the consequences of such a disorder? 



##########
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##########
@@ -236,6 +237,96 @@ def test_rolling_upgrade_with_2_bounces(self, from_version, to_version):
 
         self.stop_and_await()
 
+    @cluster(num_nodes=6)
+    @matrix(from_version=table_agg_versions, to_version=[str(DEV_VERSION)])
+    def test_rolling_upgrade_for_table_agg(self, from_version, to_version):
+        """
+        This test verifies that the cluster successfully upgrades despite changes in the table
+        repartition topic format.
+
+        Starts 3 KafkaStreams instances with version <from_version> and upgrades one-by-one to <to_version>
+        """
+
+        extra_properties = {'test.run_table_agg': 'true'}
+
+        self.set_up_services()
+
+        self.driver.start()
+
+        # encoding different target values for different versions
+        #  - old version: value=A
+        #  - new version with `upgrade_from` flag set: value=B
+        #  - new version w/o `upgrade_from` set set: value=C
+
+        extra_properties = extra_properties.copy()
+        extra_properties['test.agg_produce_value'] = 'A'
+        extra_properties['test.expected_agg_values'] = 'A'
+        self.start_all_nodes_with(from_version, extra_properties)
+
+        counter = 1
+        random.seed()
+
+        # rolling bounce
+        random.shuffle(self.processors)
+        p3 = self.processors[-1]
+        for p in self.processors:
+            p.CLEAN_NODE_ENABLED = False
+
+        # bounce two instances to new version (verifies that new version can process records
+        # written by old version)
+        extra_properties = extra_properties.copy()
+        extra_properties['test.agg_produce_value'] = 'B'
+        extra_properties['test.expected_agg_values'] = 'A,B'
+        for p in self.processors[:-1]:
+            self.do_stop_start_bounce(p, from_version[:-2], to_version, counter, extra_properties)

Review Comment:
   I do not understand `from_version[:-2]` here. Doesn't this return a sublist? 



##########
streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java:
##########
@@ -34,10 +34,34 @@ public class SmokeTestUtil {
 
     final static int END = Integer.MAX_VALUE;
 
+    static ProcessorSupplier<Object, Object, Void, Void> printTaskProcessorSupplier(final String topic) {
+        return printTaskProcessorSupplier(topic, "");
+    }
+
     static ProcessorSupplier<Object, Object, Void, Void> printProcessorSupplier(final String topic) {
         return printProcessorSupplier(topic, "");
     }
 
+    static ProcessorSupplier<Object, Object, Void, Void> printTaskProcessorSupplier(final String topic, final String name) {

Review Comment:
   It seems the parameter `name` is not used.



##########
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##########
@@ -236,6 +237,96 @@ def test_rolling_upgrade_with_2_bounces(self, from_version, to_version):
 
         self.stop_and_await()
 
+    @cluster(num_nodes=6)
+    @matrix(from_version=table_agg_versions, to_version=[str(DEV_VERSION)])
+    def test_rolling_upgrade_for_table_agg(self, from_version, to_version):
+        """
+        This test verifies that the cluster successfully upgrades despite changes in the table
+        repartition topic format.
+
+        Starts 3 KafkaStreams instances with version <from_version> and upgrades one-by-one to <to_version>
+        """
+
+        extra_properties = {'test.run_table_agg': 'true'}
+
+        self.set_up_services()
+
+        self.driver.start()
+
+        # encoding different target values for different versions
+        #  - old version: value=A
+        #  - new version with `upgrade_from` flag set: value=B
+        #  - new version w/o `upgrade_from` set set: value=C
+
+        extra_properties = extra_properties.copy()
+        extra_properties['test.agg_produce_value'] = 'A'
+        extra_properties['test.expected_agg_values'] = 'A'
+        self.start_all_nodes_with(from_version, extra_properties)
+
+        counter = 1
+        random.seed()
+
+        # rolling bounce
+        random.shuffle(self.processors)
+        p3 = self.processors[-1]
+        for p in self.processors:
+            p.CLEAN_NODE_ENABLED = False
+
+        # bounce two instances to new version (verifies that new version can process records
+        # written by old version)

Review Comment:
   Do we have any guarantee that the instance on the new version do actually read any records from the repartition topic?
   Do we need to insert some sleeps to ensure that enough records are written to the repartition topics?
   Do we need to increase the commit interval to ensure that records are not purged from the repartition topic?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13656:
URL: https://github.com/apache/kafka/pull/13656#discussion_r1183042758


##########
streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java:
##########
@@ -143,6 +173,94 @@ private static void buildFKTable(final KStream<String, Integer> primaryTable,
         kStream.to("fk-result", Produced.with(stringSerde, stringSerde));
     }
 
+    private static void buildTableAgg(final KTable<String, Integer> sourceTable,
+                                      final String aggProduceValue,
+                                      final List<String> expectedAggValues) {
+        final KStream<Integer, String> result = sourceTable
+            .groupBy(
+                (k, v) -> new KeyValue<>(v, aggProduceValue),
+                Grouped.with(intSerde, stringSerde))
+            .aggregate(
+                () -> new Agg(Collections.emptyList(), 0),
+                (k, v, agg) -> {
+                    final List<String> seenValues;
+                    final boolean updated;
+                    if (!agg.seenValues.contains(v)) {
+                        seenValues = new ArrayList<>(agg.seenValues);
+                        seenValues.add(v);
+                        Collections.sort(seenValues);
+                        updated = true;
+                    } else {
+                        seenValues = agg.seenValues;
+                        updated = false;
+                    }
+
+                    final boolean shouldLog = updated || (agg.recordsProcessed % 10 == 0); // value of 10 is chosen for debugging purposes. can increase to 100 once test is passing.
+                    if (shouldLog) {

Review Comment:
   Originally it was 
   ```
   if (shouldLog && seenValues.containsAll(expectedAggValues) {
    ...
   } else {
    ...
    }
    ```
    
   So it always logged something.



##########
streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java:
##########
@@ -143,6 +173,94 @@ private static void buildFKTable(final KStream<String, Integer> primaryTable,
         kStream.to("fk-result", Produced.with(stringSerde, stringSerde));
     }
 
+    private static void buildTableAgg(final KTable<String, Integer> sourceTable,
+                                      final String aggProduceValue,
+                                      final List<String> expectedAggValues) {
+        final KStream<Integer, String> result = sourceTable
+            .groupBy(
+                (k, v) -> new KeyValue<>(v, aggProduceValue),
+                Grouped.with(intSerde, stringSerde))
+            .aggregate(
+                () -> new Agg(Collections.emptyList(), 0),
+                (k, v, agg) -> {
+                    final List<String> seenValues;
+                    final boolean updated;
+                    if (!agg.seenValues.contains(v)) {
+                        seenValues = new ArrayList<>(agg.seenValues);
+                        seenValues.add(v);
+                        Collections.sort(seenValues);
+                        updated = true;
+                    } else {
+                        seenValues = agg.seenValues;
+                        updated = false;
+                    }
+
+                    final boolean shouldLog = updated || (agg.recordsProcessed % 10 == 0); // value of 10 is chosen for debugging purposes. can increase to 100 once test is passing.
+                    if (shouldLog) {

Review Comment:
   Originally it was 
   ```
   if (shouldLog && seenValues.containsAll(expectedAggValues) {
    ...
   } else {
    ...
   }
   ```
    
   So it always logged something.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13656:
URL: https://github.com/apache/kafka/pull/13656#discussion_r1183043768


##########
streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java:
##########
@@ -34,10 +34,34 @@ public class SmokeTestUtil {
 
     final static int END = Integer.MAX_VALUE;
 
+    static ProcessorSupplier<Object, Object, Void, Void> printTaskProcessorSupplier(final String topic) {
+        return printTaskProcessorSupplier(topic, "");
+    }
+
     static ProcessorSupplier<Object, Object, Void, Void> printProcessorSupplier(final String topic) {
         return printProcessorSupplier(topic, "");
     }
 
+    static ProcessorSupplier<Object, Object, Void, Void> printTaskProcessorSupplier(final String topic, final String name) {

Review Comment:
   Yeah, it was a quick-and-dirty thing -- guess it might make sense to actually have a single Processor and let the original one inherit from the new one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13656:
URL: https://github.com/apache/kafka/pull/13656#discussion_r1187176080


##########
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##########
@@ -236,6 +237,96 @@ def test_rolling_upgrade_with_2_bounces(self, from_version, to_version):
 
         self.stop_and_await()
 
+    @cluster(num_nodes=6)
+    @matrix(from_version=table_agg_versions, to_version=[str(DEV_VERSION)])
+    def test_rolling_upgrade_for_table_agg(self, from_version, to_version):
+        """
+        This test verifies that the cluster successfully upgrades despite changes in the table
+        repartition topic format.
+
+        Starts 3 KafkaStreams instances with version <from_version> and upgrades one-by-one to <to_version>
+        """
+
+        extra_properties = {'test.run_table_agg': 'true'}
+
+        self.set_up_services()
+
+        self.driver.start()
+
+        # encoding different target values for different versions
+        #  - old version: value=A
+        #  - new version with `upgrade_from` flag set: value=B
+        #  - new version w/o `upgrade_from` set set: value=C
+
+        extra_properties = extra_properties.copy()
+        extra_properties['test.agg_produce_value'] = 'A'
+        extra_properties['test.expected_agg_values'] = 'A'
+        self.start_all_nodes_with(from_version, extra_properties)
+
+        counter = 1
+        random.seed()
+
+        # rolling bounce
+        random.shuffle(self.processors)
+        p3 = self.processors[-1]
+        for p in self.processors:
+            p.CLEAN_NODE_ENABLED = False
+
+        # bounce two instances to new version (verifies that new version can process records
+        # written by old version)

Review Comment:
   I see the check, but it does not guarantee that the instance that was not rolled had enough time to write records in the old format to partitions that will be read by the rolled instances. Also it does not guarantee that the records that might have been written by the not rolled instance to partitions that will be read by the rolled instances have not been already consumed by the not rolled instance itself before the rolled instances start processing. Similar is true for the subsequent rolls.
   Does this make sense or do I miss something?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org