You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ca...@apache.org on 2021/09/27 15:06:07 UTC
[kafka] branch trunk updated: MINOR: replace deprecated
exactly_once_beta into exactly_once_v2 (#10884)
This is an automated email from the ASF dual-hosted git repository.
cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1af1c80 MINOR: replace deprecated exactly_once_beta into exactly_once_v2 (#10884)
1af1c80 is described below
commit 1af1c80e2de561e012313d69649635d871e43181
Author: Luke Chen <sh...@gmail.com>
AuthorDate: Mon Sep 27 23:02:48 2021 +0800
MINOR: replace deprecated exactly_once_beta into exactly_once_v2 (#10884)
Replace deprecated exactly_once_beta with exactly_once_v2 in system tests.
Follow up for #10870, found out there are still some system tests using the deprecated exactly_once_beta. This PR updates them.
Reviewers: Bruno Cadonna <ca...@apache.org>
---
.../streams/streams_broker_compatibility_test.py | 47 ++++++++++++----------
tests/kafkatest/tests/streams/streams_eos_test.py | 8 ++--
.../tests/streams/streams_relational_smoke_test.py | 4 +-
3 files changed, 31 insertions(+), 28 deletions(-)
diff --git a/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py b/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py
index 1e7ece7..83b0735 100644
--- a/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py
+++ b/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py
@@ -22,7 +22,8 @@ from kafkatest.services.streams import StreamsBrokerCompatibilityService
from kafkatest.services.verifiable_consumer import VerifiableConsumer
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.version import LATEST_0_11_0, LATEST_0_10_2, LATEST_0_10_1, LATEST_0_10_0, LATEST_1_0, LATEST_1_1, \
- LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, KafkaVersion
+ LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, LATEST_2_8, \
+ KafkaVersion
class StreamsBrokerCompatibility(Test):
@@ -30,9 +31,9 @@ class StreamsBrokerCompatibility(Test):
These tests validates that
- Streams works for older brokers 0.11 (or newer)
- Streams w/ EOS-alpha works for older brokers 0.11 (or newer)
- - Streams w/ EOS-beta works for older brokers 2.5 (or newer)
+ - Streams w/ EOS-v2 works for older brokers 2.5 (or newer)
- Streams fails fast for older brokers 0.10.0, 0.10.2, and 0.10.1
- - Streams w/ EOS-beta fails fast for older brokers 2.4 or older
+ - Streams w/ EOS-v2 fails fast for older brokers 2.4 or older
"""
input = "brokerCompatibilitySourceTopic"
@@ -114,23 +115,25 @@ class StreamsBrokerCompatibility(Test):
self.consumer.stop()
self.kafka.stop()
- # TODO enable after 2.5 is released
- # @parametrize(broker_version=str(LATEST_2_5))
- # def test_compatible_brokers_eos_beta_enabled(self, broker_version):
- # self.kafka.set_version(KafkaVersion(broker_version))
- # self.kafka.start()
- #
- # processor = StreamsBrokerCompatibilityService(self.test_context, self.kafka, "exactly_once_beta")
- # processor.start()
- #
- # self.consumer.start()
- #
- # processor.wait()
- #
- # wait_until(lambda: self.consumer.total_consumed() > 0, timeout_sec=30, err_msg="Did expect to read a message but got none within 30 seconds.")
- #
- # self.consumer.stop()
- # self.kafka.stop()
+ @parametrize(broker_version=str(LATEST_2_8))
+ @parametrize(broker_version=str(LATEST_2_7))
+ @parametrize(broker_version=str(LATEST_2_6))
+ @parametrize(broker_version=str(LATEST_2_5))
+ def test_compatible_brokers_eos_v2_enabled(self, broker_version):
+ self.kafka.set_version(KafkaVersion(broker_version))
+ self.kafka.start()
+
+ processor = StreamsBrokerCompatibilityService(self.test_context, self.kafka, "exactly_once_v2")
+ processor.start()
+
+ self.consumer.start()
+
+ processor.wait()
+
+ wait_until(lambda: self.consumer.total_consumed() > 0, timeout_sec=30, err_msg="Did expect to read a message but got none within 30 seconds.")
+
+ self.consumer.stop()
+ self.kafka.stop()
@cluster(num_nodes=4)
@parametrize(broker_version=str(LATEST_0_10_2))
@@ -159,11 +162,11 @@ class StreamsBrokerCompatibility(Test):
@parametrize(broker_version=str(LATEST_1_1))
@parametrize(broker_version=str(LATEST_1_0))
@parametrize(broker_version=str(LATEST_0_11_0))
- def test_fail_fast_on_incompatible_brokers_if_eos_beta_enabled(self, broker_version):
+ def test_fail_fast_on_incompatible_brokers_if_eos_v2_enabled(self, broker_version):
self.kafka.set_version(KafkaVersion(broker_version))
self.kafka.start()
- processor = StreamsBrokerCompatibilityService(self.test_context, self.kafka, "exactly_once_beta")
+ processor = StreamsBrokerCompatibilityService(self.test_context, self.kafka, "exactly_once_v2")
with processor.node.account.monitor_log(processor.STDERR_FILE) as monitor:
with processor.node.account.monitor_log(processor.LOG_FILE) as log:
diff --git a/tests/kafkatest/tests/streams/streams_eos_test.py b/tests/kafkatest/tests/streams/streams_eos_test.py
index 5e9091e..618d378 100644
--- a/tests/kafkatest/tests/streams/streams_eos_test.py
+++ b/tests/kafkatest/tests/streams/streams_eos_test.py
@@ -39,7 +39,7 @@ class StreamsEosTest(KafkaTest):
@cluster(num_nodes=9)
@parametrize(processing_guarantee="exactly_once")
- @parametrize(processing_guarantee="exactly_once_beta")
+ @parametrize(processing_guarantee="exactly_once_v2")
def test_rebalance_simple(self, processing_guarantee):
self.run_rebalance(StreamsEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee),
StreamsEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee),
@@ -48,7 +48,7 @@ class StreamsEosTest(KafkaTest):
@cluster(num_nodes=9)
@parametrize(processing_guarantee="exactly_once")
- @parametrize(processing_guarantee="exactly_once_beta")
+ @parametrize(processing_guarantee="exactly_once_v2")
def test_rebalance_complex(self, processing_guarantee):
self.run_rebalance(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee),
StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee),
@@ -83,7 +83,7 @@ class StreamsEosTest(KafkaTest):
@cluster(num_nodes=9)
@parametrize(processing_guarantee="exactly_once")
- @parametrize(processing_guarantee="exactly_once_beta")
+ @parametrize(processing_guarantee="exactly_once_v2")
def test_failure_and_recovery(self, processing_guarantee):
self.run_failure_and_recovery(StreamsEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee),
StreamsEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee),
@@ -92,7 +92,7 @@ class StreamsEosTest(KafkaTest):
@cluster(num_nodes=9)
@parametrize(processing_guarantee="exactly_once")
- @parametrize(processing_guarantee="exactly_once_beta")
+ @parametrize(processing_guarantee="exactly_once_v2")
def test_failure_and_recovery_complex(self, processing_guarantee):
self.run_failure_and_recovery(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee),
StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee),
diff --git a/tests/kafkatest/tests/streams/streams_relational_smoke_test.py b/tests/kafkatest/tests/streams/streams_relational_smoke_test.py
index a078f5e..fe10a29 100644
--- a/tests/kafkatest/tests/streams/streams_relational_smoke_test.py
+++ b/tests/kafkatest/tests/streams/streams_relational_smoke_test.py
@@ -85,11 +85,11 @@ class StreamsRelationalSmokeTest(KafkaTest):
@cluster(num_nodes=8)
@matrix(crash=[False, True],
- processing_guarantee=['exactly_once', 'exactly_once_beta'])
+ processing_guarantee=['exactly_once', 'exactly_once_v2'])
def test_streams(self, crash, processing_guarantee):
driver = StreamsRelationalSmokeTestService(self.test_context, self.kafka, "driver", "ignored", "ignored")
- LOG_FILE = driver.LOG_FILE # this is the same for all instaces of the service, so we can just declare a "constant"
+ LOG_FILE = driver.LOG_FILE # this is the same for all instances of the service, so we can just declare a "constant"
processor1 = StreamsRelationalSmokeTestService(self.test_context, self.kafka, "application", "processor1", processing_guarantee)
processor2 = StreamsRelationalSmokeTestService(self.test_context, self.kafka, "application", "processor2", processing_guarantee)