You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "fqaiser94 (via GitHub)" <gi...@apache.org> on 2023/05/10 21:53:32 UTC

[GitHub] [kafka] fqaiser94 commented on a diff in pull request #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904

fqaiser94 commented on code in PR #13656:
URL: https://github.com/apache/kafka/pull/13656#discussion_r1190412985


##########
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##########
@@ -236,6 +237,96 @@ def test_rolling_upgrade_with_2_bounces(self, from_version, to_version):
 
         self.stop_and_await()
 
+    @cluster(num_nodes=6)
+    @matrix(from_version=table_agg_versions, to_version=[str(DEV_VERSION)])
+    def test_rolling_upgrade_for_table_agg(self, from_version, to_version):
+        """
+        This test verifies that the cluster successfully upgrades despite changes in the table
+        repartition topic format.
+
+        Starts 3 KafkaStreams instances with version <from_version> and upgrades one-by-one to <to_version>
+        """
+
+        extra_properties = {'test.run_table_agg': 'true'}
+
+        self.set_up_services()
+
+        self.driver.start()
+
+        # encoding different target values for different versions
+        #  - old version: value=A
+        #  - new version with `upgrade_from` flag set: value=B
+        #  - new version w/o `upgrade_from` set set: value=C
+
+        extra_properties = extra_properties.copy()
+        extra_properties['test.agg_produce_value'] = 'A'
+        extra_properties['test.expected_agg_values'] = 'A'
+        self.start_all_nodes_with(from_version, extra_properties)
+
+        counter = 1
+        random.seed()
+
+        # rolling bounce
+        random.shuffle(self.processors)
+        p3 = self.processors[-1]
+        for p in self.processors:
+            p.CLEAN_NODE_ENABLED = False
+

Review Comment:
   ```suggestion
   self.wait_for_table_agg_success('A')
   ```
   
   nit: could we add this here? 
   This is would just ensure that at this point all instances have processed a message and their aggregates look like `Agg(List('A'))`. 



##########
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##########
@@ -236,6 +237,96 @@ def test_rolling_upgrade_with_2_bounces(self, from_version, to_version):
 
         self.stop_and_await()
 
+    @cluster(num_nodes=6)
+    @matrix(from_version=table_agg_versions, to_version=[str(DEV_VERSION)])
+    def test_rolling_upgrade_for_table_agg(self, from_version, to_version):
+        """
+        This test verifies that the cluster successfully upgrades despite changes in the table
+        repartition topic format.
+
+        Starts 3 KafkaStreams instances with version <from_version> and upgrades one-by-one to <to_version>
+        """
+
+        extra_properties = {'test.run_table_agg': 'true'}
+
+        self.set_up_services()
+
+        self.driver.start()
+
+        # encoding different target values for different versions
+        #  - old version: value=A
+        #  - new version with `upgrade_from` flag set: value=B
+        #  - new version w/o `upgrade_from` set set: value=C
+
+        extra_properties = extra_properties.copy()
+        extra_properties['test.agg_produce_value'] = 'A'
+        extra_properties['test.expected_agg_values'] = 'A'
+        self.start_all_nodes_with(from_version, extra_properties)
+
+        counter = 1
+        random.seed()
+
+        # rolling bounce
+        random.shuffle(self.processors)
+        p3 = self.processors[-1]
+        for p in self.processors:
+            p.CLEAN_NODE_ENABLED = False
+
+        # bounce two instances to new version (verifies that new version can process records
+        # written by old version)
+        extra_properties = extra_properties.copy()
+        extra_properties['test.agg_produce_value'] = 'B'
+        extra_properties['test.expected_agg_values'] = 'A,B'
+        for p in self.processors[:-1]:
+            self.do_stop_start_bounce(p, from_version[:-2], to_version, counter, extra_properties)
+            counter = counter + 1
+
+        # bounce remaining instance on old version (just for verification purposes, to verify that
+        # instance on old version can process records written by new version)
+        extra_properties = extra_properties.copy()
+        extra_properties['test.agg_produce_value'] = 'A'
+        extra_properties['test.expected_agg_values'] = 'A,B'
+        self.do_stop_start_bounce(p3, None, from_version, counter, extra_properties)
+        counter = counter + 1
+
+        self.wait_for_table_agg_success('A,B')

Review Comment:
   ```suggestion
           # bounce remaining instance on old version to produce a new unique value
           extra_properties = extra_properties.copy()
           extra_properties['test.agg_produce_value'] = 'C'
           extra_properties['test.expected_agg_values'] = 'A,B,C'
           self.do_stop_start_bounce(p3, None, from_version, counter, extra_properties)
           counter = counter + 1
           
           # verify that new version can process records from old version
           self.wait_for_table_agg_success('A,B,C')
   ```
   
   I think it might be better to have this old (not-upgraded) instance start producing a new value when we bounce it here. That way, we can assert using `self.wait_for_table_agg_success('A,B,C')` and be sure that the two upgraded instances have successfully processed messages from the old (not-upgraded) instance as well. 
   
   (Note, if you accept this change, you will need to make changes below here to produce new unique values like D,E,F, etc.)



##########
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##########
@@ -236,6 +237,96 @@ def test_rolling_upgrade_with_2_bounces(self, from_version, to_version):
 
         self.stop_and_await()
 
+    @cluster(num_nodes=6)
+    @matrix(from_version=table_agg_versions, to_version=[str(DEV_VERSION)])
+    def test_rolling_upgrade_for_table_agg(self, from_version, to_version):
+        """
+        This test verifies that the cluster successfully upgrades despite changes in the table
+        repartition topic format.
+
+        Starts 3 KafkaStreams instances with version <from_version> and upgrades one-by-one to <to_version>
+        """
+
+        extra_properties = {'test.run_table_agg': 'true'}
+
+        self.set_up_services()
+
+        self.driver.start()
+
+        # encoding different target values for different versions
+        #  - old version: value=A
+        #  - new version with `upgrade_from` flag set: value=B
+        #  - new version w/o `upgrade_from` set set: value=C
+
+        extra_properties = extra_properties.copy()
+        extra_properties['test.agg_produce_value'] = 'A'
+        extra_properties['test.expected_agg_values'] = 'A'
+        self.start_all_nodes_with(from_version, extra_properties)
+
+        counter = 1
+        random.seed()
+
+        # rolling bounce
+        random.shuffle(self.processors)
+        p3 = self.processors[-1]
+        for p in self.processors:
+            p.CLEAN_NODE_ENABLED = False
+
+        # bounce two instances to new version (verifies that new version can process records
+        # written by old version)
+        extra_properties = extra_properties.copy()
+        extra_properties['test.agg_produce_value'] = 'B'
+        extra_properties['test.expected_agg_values'] = 'A,B'
+        for p in self.processors[:-1]:
+            self.do_stop_start_bounce(p, from_version[:-2], to_version, counter, extra_properties)
+            counter = counter + 1
+

Review Comment:
   ```suggestion
   # verify that old version can process records from new version
   self.wait_for_table_agg_success('A,B')
   ```
   
   Can we just assert this condition here? 
   I don't think we need to bounce the remaining instance before asserting this condition. 



##########
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##########
@@ -236,6 +237,96 @@ def test_rolling_upgrade_with_2_bounces(self, from_version, to_version):
 
         self.stop_and_await()
 
+    @cluster(num_nodes=6)
+    @matrix(from_version=table_agg_versions, to_version=[str(DEV_VERSION)])
+    def test_rolling_upgrade_for_table_agg(self, from_version, to_version):
+        """
+        This test verifies that the cluster successfully upgrades despite changes in the table
+        repartition topic format.
+
+        Starts 3 KafkaStreams instances with version <from_version> and upgrades one-by-one to <to_version>
+        """
+
+        extra_properties = {'test.run_table_agg': 'true'}
+
+        self.set_up_services()
+
+        self.driver.start()
+
+        # encoding different target values for different versions
+        #  - old version: value=A
+        #  - new version with `upgrade_from` flag set: value=B
+        #  - new version w/o `upgrade_from` set set: value=C
+
+        extra_properties = extra_properties.copy()
+        extra_properties['test.agg_produce_value'] = 'A'
+        extra_properties['test.expected_agg_values'] = 'A'
+        self.start_all_nodes_with(from_version, extra_properties)
+
+        counter = 1
+        random.seed()
+
+        # rolling bounce
+        random.shuffle(self.processors)
+        p3 = self.processors[-1]
+        for p in self.processors:
+            p.CLEAN_NODE_ENABLED = False

Review Comment:
   ```suggestion
           random.shuffle(self.processors)
           p3 = self.processors[-1]
           for p in self.processors:
               p.CLEAN_NODE_ENABLED = False
   ```
   
   It seems to me these lines of code have nothing to do with executing a "rolling bounce"?



##########
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##########
@@ -236,6 +237,96 @@ def test_rolling_upgrade_with_2_bounces(self, from_version, to_version):
 
         self.stop_and_await()
 
+    @cluster(num_nodes=6)
+    @matrix(from_version=table_agg_versions, to_version=[str(DEV_VERSION)])
+    def test_rolling_upgrade_for_table_agg(self, from_version, to_version):
+        """
+        This test verifies that the cluster successfully upgrades despite changes in the table
+        repartition topic format.
+
+        Starts 3 KafkaStreams instances with version <from_version> and upgrades one-by-one to <to_version>
+        """
+
+        extra_properties = {'test.run_table_agg': 'true'}
+
+        self.set_up_services()
+
+        self.driver.start()
+
+        # encoding different target values for different versions
+        #  - old version: value=A
+        #  - new version with `upgrade_from` flag set: value=B
+        #  - new version w/o `upgrade_from` set set: value=C

Review Comment:
   ```suggestion
           #  - new version w/o `upgrade_from` flag set: value=C
   ```



##########
streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java:
##########
@@ -143,6 +173,94 @@ private static void buildFKTable(final KStream<String, Integer> primaryTable,
         kStream.to("fk-result", Produced.with(stringSerde, stringSerde));
     }
 
+    private static void buildTableAgg(final KTable<String, Integer> sourceTable,
+                                      final String aggProduceValue,
+                                      final List<String> expectedAggValues) {
+        final KStream<Integer, String> result = sourceTable
+            .groupBy(
+                (k, v) -> new KeyValue<>(v, aggProduceValue),
+                Grouped.with(intSerde, stringSerde))
+            .aggregate(
+                () -> new Agg(Collections.emptyList(), 0),
+                (k, v, agg) -> {
+                    final List<String> seenValues;
+                    final boolean updated;
+                    if (!agg.seenValues.contains(v)) {
+                        seenValues = new ArrayList<>(agg.seenValues);
+                        seenValues.add(v);
+                        Collections.sort(seenValues);

Review Comment:
   Is there a reason we **_need_** to sort? 



##########
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##########
@@ -236,6 +237,96 @@ def test_rolling_upgrade_with_2_bounces(self, from_version, to_version):
 
         self.stop_and_await()
 
+    @cluster(num_nodes=6)
+    @matrix(from_version=table_agg_versions, to_version=[str(DEV_VERSION)])
+    def test_rolling_upgrade_for_table_agg(self, from_version, to_version):
+        """
+        This test verifies that the cluster successfully upgrades despite changes in the table
+        repartition topic format.
+
+        Starts 3 KafkaStreams instances with version <from_version> and upgrades one-by-one to <to_version>
+        """
+
+        extra_properties = {'test.run_table_agg': 'true'}
+
+        self.set_up_services()
+
+        self.driver.start()
+
+        # encoding different target values for different versions
+        #  - old version: value=A
+        #  - new version with `upgrade_from` flag set: value=B
+        #  - new version w/o `upgrade_from` set set: value=C
+
+        extra_properties = extra_properties.copy()
+        extra_properties['test.agg_produce_value'] = 'A'
+        extra_properties['test.expected_agg_values'] = 'A'
+        self.start_all_nodes_with(from_version, extra_properties)
+
+        counter = 1
+        random.seed()
+
+        # rolling bounce
+        random.shuffle(self.processors)
+        p3 = self.processors[-1]
+        for p in self.processors:
+            p.CLEAN_NODE_ENABLED = False
+
+        # bounce two instances to new version (verifies that new version can process records
+        # written by old version)
+        extra_properties = extra_properties.copy()
+        extra_properties['test.agg_produce_value'] = 'B'
+        extra_properties['test.expected_agg_values'] = 'A,B'
+        for p in self.processors[:-1]:
+            self.do_stop_start_bounce(p, from_version[:-2], to_version, counter, extra_properties)
+            counter = counter + 1
+
+        # bounce remaining instance on old version (just for verification purposes, to verify that
+        # instance on old version can process records written by new version)
+        extra_properties = extra_properties.copy()
+        extra_properties['test.agg_produce_value'] = 'A'
+        extra_properties['test.expected_agg_values'] = 'A,B'
+        self.do_stop_start_bounce(p3, None, from_version, counter, extra_properties)
+        counter = counter + 1
+
+        self.wait_for_table_agg_success('A,B')
+
+        # bounce remaining instance to new version (verifies that new version without upgrade_from
+        # can process records written by new version with upgrade_from)
+        extra_properties = extra_properties.copy()
+        extra_properties['test.agg_produce_value'] = 'C'
+        extra_properties['test.expected_agg_values'] = 'A,B,C'
+        self.do_stop_start_bounce(p3, None, to_version, counter, extra_properties)
+        counter = counter + 1
+
+        # bounce first instances again without removing upgrade_from (just for verification purposes,
+        # to verify that instance with upgrade_from can process records written without upgrade_from)
+        extra_properties = extra_properties.copy()
+        extra_properties['test.agg_produce_value'] = 'B'
+        extra_properties['test.expected_agg_values'] = 'A,B,C'
+        for p in self.processors[:-1]:
+            self.do_stop_start_bounce(p, from_version[:-2], to_version, counter, extra_properties)
+            counter = counter + 1
+
+        self.wait_for_table_agg_success('A,B,C')
+
+        self.stop_and_await()
+
+    def wait_for_table_agg_success(self, expected_values):
+        agg_success_str = "Table aggregate processor saw expected values. Seen: " + expected_values

Review Comment:
   Surprised you don't need to specify the full string? I guess we just check for any lines beginning with `agg_success_str` rather than lines that are an exact match with `agg_success_str`. 
   
   https://github.com/apache/kafka/blob/a70e6ba464cc6ef2196a7c2e419a308b627f41f1/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java#L201



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org