You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ca...@apache.org on 2021/09/27 15:06:07 UTC

[kafka] branch trunk updated: MINOR: replace deprecated exactly_once_beta into exactly_once_v2 (#10884)

This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1af1c80  MINOR: replace deprecated exactly_once_beta into exactly_once_v2 (#10884)
1af1c80 is described below

commit 1af1c80e2de561e012313d69649635d871e43181
Author: Luke Chen <sh...@gmail.com>
AuthorDate: Mon Sep 27 23:02:48 2021 +0800

    MINOR: replace deprecated exactly_once_beta into exactly_once_v2 (#10884)
    
    Replace deprecated exactly_once_beta with exactly_once_v2 in system tests.
    
    Follow up for #10870, found out there are still some system tests using the deprecated exactly_once_beta. This PR updates them.
    
    Reviewers: Bruno Cadonna <ca...@apache.org>
---
 .../streams/streams_broker_compatibility_test.py   | 47 ++++++++++++----------
 tests/kafkatest/tests/streams/streams_eos_test.py  |  8 ++--
 .../tests/streams/streams_relational_smoke_test.py |  4 +-
 3 files changed, 31 insertions(+), 28 deletions(-)

diff --git a/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py b/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py
index 1e7ece7..83b0735 100644
--- a/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py
+++ b/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py
@@ -22,7 +22,8 @@ from kafkatest.services.streams import StreamsBrokerCompatibilityService
 from kafkatest.services.verifiable_consumer import VerifiableConsumer
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.version import LATEST_0_11_0, LATEST_0_10_2, LATEST_0_10_1, LATEST_0_10_0, LATEST_1_0, LATEST_1_1, \
-    LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, KafkaVersion
+    LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, LATEST_2_8, \
+    KafkaVersion
 
 
 class StreamsBrokerCompatibility(Test):
@@ -30,9 +31,9 @@ class StreamsBrokerCompatibility(Test):
     These tests validates that
     - Streams works for older brokers 0.11 (or newer)
     - Streams w/ EOS-alpha works for older brokers 0.11 (or newer)
-    - Streams w/ EOS-beta works for older brokers 2.5 (or newer)
+    - Streams w/ EOS-v2 works for older brokers 2.5 (or newer)
     - Streams fails fast for older brokers 0.10.0, 0.10.2, and 0.10.1
-    - Streams w/ EOS-beta fails fast for older brokers 2.4 or older
+    - Streams w/ EOS-v2 fails fast for older brokers 2.4 or older
     """
 
     input = "brokerCompatibilitySourceTopic"
@@ -114,23 +115,25 @@ class StreamsBrokerCompatibility(Test):
         self.consumer.stop()
         self.kafka.stop()
 
-    # TODO enable after 2.5 is released
-    # @parametrize(broker_version=str(LATEST_2_5))
-    # def test_compatible_brokers_eos_beta_enabled(self, broker_version):
-    #     self.kafka.set_version(KafkaVersion(broker_version))
-    #     self.kafka.start()
-    #
-    #     processor = StreamsBrokerCompatibilityService(self.test_context, self.kafka, "exactly_once_beta")
-    #     processor.start()
-    #
-    #     self.consumer.start()
-    #
-    #     processor.wait()
-    #
-    #     wait_until(lambda: self.consumer.total_consumed() > 0, timeout_sec=30, err_msg="Did expect to read a message but got none within 30 seconds.")
-    #
-    #     self.consumer.stop()
-    #     self.kafka.stop()
+    @parametrize(broker_version=str(LATEST_2_8))
+    @parametrize(broker_version=str(LATEST_2_7))
+    @parametrize(broker_version=str(LATEST_2_6))
+    @parametrize(broker_version=str(LATEST_2_5))
+    def test_compatible_brokers_eos_v2_enabled(self, broker_version):
+        self.kafka.set_version(KafkaVersion(broker_version))
+        self.kafka.start()
+
+        processor = StreamsBrokerCompatibilityService(self.test_context, self.kafka, "exactly_once_v2")
+        processor.start()
+
+        self.consumer.start()
+
+        processor.wait()
+
+        wait_until(lambda: self.consumer.total_consumed() > 0, timeout_sec=30, err_msg="Did expect to read a message but got none within 30 seconds.")
+
+        self.consumer.stop()
+        self.kafka.stop()
 
     @cluster(num_nodes=4)
     @parametrize(broker_version=str(LATEST_0_10_2))
@@ -159,11 +162,11 @@ class StreamsBrokerCompatibility(Test):
     @parametrize(broker_version=str(LATEST_1_1))
     @parametrize(broker_version=str(LATEST_1_0))
     @parametrize(broker_version=str(LATEST_0_11_0))
-    def test_fail_fast_on_incompatible_brokers_if_eos_beta_enabled(self, broker_version):
+    def test_fail_fast_on_incompatible_brokers_if_eos_v2_enabled(self, broker_version):
         self.kafka.set_version(KafkaVersion(broker_version))
         self.kafka.start()
 
-        processor = StreamsBrokerCompatibilityService(self.test_context, self.kafka, "exactly_once_beta")
+        processor = StreamsBrokerCompatibilityService(self.test_context, self.kafka, "exactly_once_v2")
 
         with processor.node.account.monitor_log(processor.STDERR_FILE) as monitor:
             with processor.node.account.monitor_log(processor.LOG_FILE) as log:
diff --git a/tests/kafkatest/tests/streams/streams_eos_test.py b/tests/kafkatest/tests/streams/streams_eos_test.py
index 5e9091e..618d378 100644
--- a/tests/kafkatest/tests/streams/streams_eos_test.py
+++ b/tests/kafkatest/tests/streams/streams_eos_test.py
@@ -39,7 +39,7 @@ class StreamsEosTest(KafkaTest):
 
     @cluster(num_nodes=9)
     @parametrize(processing_guarantee="exactly_once")
-    @parametrize(processing_guarantee="exactly_once_beta")
+    @parametrize(processing_guarantee="exactly_once_v2")
     def test_rebalance_simple(self, processing_guarantee):
         self.run_rebalance(StreamsEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee),
                            StreamsEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee),
@@ -48,7 +48,7 @@ class StreamsEosTest(KafkaTest):
 
     @cluster(num_nodes=9)
     @parametrize(processing_guarantee="exactly_once")
-    @parametrize(processing_guarantee="exactly_once_beta")
+    @parametrize(processing_guarantee="exactly_once_v2")
     def test_rebalance_complex(self, processing_guarantee):
         self.run_rebalance(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee),
                            StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee),
@@ -83,7 +83,7 @@ class StreamsEosTest(KafkaTest):
 
     @cluster(num_nodes=9)
     @parametrize(processing_guarantee="exactly_once")
-    @parametrize(processing_guarantee="exactly_once_beta")
+    @parametrize(processing_guarantee="exactly_once_v2")
     def test_failure_and_recovery(self, processing_guarantee):
         self.run_failure_and_recovery(StreamsEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee),
                                       StreamsEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee),
@@ -92,7 +92,7 @@ class StreamsEosTest(KafkaTest):
 
     @cluster(num_nodes=9)
     @parametrize(processing_guarantee="exactly_once")
-    @parametrize(processing_guarantee="exactly_once_beta")
+    @parametrize(processing_guarantee="exactly_once_v2")
     def test_failure_and_recovery_complex(self, processing_guarantee):
         self.run_failure_and_recovery(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee),
                                       StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee),
diff --git a/tests/kafkatest/tests/streams/streams_relational_smoke_test.py b/tests/kafkatest/tests/streams/streams_relational_smoke_test.py
index a078f5e..fe10a29 100644
--- a/tests/kafkatest/tests/streams/streams_relational_smoke_test.py
+++ b/tests/kafkatest/tests/streams/streams_relational_smoke_test.py
@@ -85,11 +85,11 @@ class StreamsRelationalSmokeTest(KafkaTest):
 
     @cluster(num_nodes=8)
     @matrix(crash=[False, True],
-            processing_guarantee=['exactly_once', 'exactly_once_beta'])
+            processing_guarantee=['exactly_once', 'exactly_once_v2'])
     def test_streams(self, crash, processing_guarantee):
         driver = StreamsRelationalSmokeTestService(self.test_context, self.kafka, "driver", "ignored", "ignored")
 
-        LOG_FILE = driver.LOG_FILE  # this is the same for all instaces of the service, so we can just declare a "constant"
+        LOG_FILE = driver.LOG_FILE  # this is the same for all instances of the service, so we can just declare a "constant"
 
         processor1 = StreamsRelationalSmokeTestService(self.test_context, self.kafka, "application", "processor1", processing_guarantee)
         processor2 = StreamsRelationalSmokeTestService(self.test_context, self.kafka, "application", "processor2", processing_guarantee)