You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Guozhang Wang (JIRA)" <ji...@apache.org> on 2019/02/28 16:48:00 UTC
[jira] [Created] (KAFKA-8017) Narrow the scope of Streams'
broker-upgrade-test
Guozhang Wang created KAFKA-8017:
------------------------------------
Summary: Narrow the scope of Streams' broker-upgrade-test
Key: KAFKA-8017
URL: https://issues.apache.org/jira/browse/KAFKA-8017
Project: Kafka
Issue Type: Improvement
Components: streams
Reporter: Guozhang Wang
We had a streams-broker-upgrade test in which we kept the streams client as the dev version, and upgrade/downgrade brokers between arbitrary versions. This has several issues:
1) not all upgrade / downgrade paths are supported due to message format change.
2) even for those supported paths, we should consider the impact of inter.broker.protocol and message.format. More specifically: when upgrade to new version byte code, we should stick with the old protocol/version, when down grade to old version byte code, we should start with the old protocol/version.
A good reference to look at is the broker's own upgrade path where they listed all the possible path so far:
{code}
@parametrize(from_kafka_version=str(LATEST_1_1), to_message_format_version=None, compression_types=["none"])
@parametrize(from_kafka_version=str(LATEST_1_1), to_message_format_version=None, compression_types=["lz4"])
@parametrize(from_kafka_version=str(LATEST_1_0), to_message_format_version=None, compression_types=["none"])
@parametrize(from_kafka_version=str(LATEST_1_0), to_message_format_version=None, compression_types=["snappy"])
@parametrize(from_kafka_version=str(LATEST_0_11_0), to_message_format_version=None, compression_types=["gzip"])
@parametrize(from_kafka_version=str(LATEST_0_11_0), to_message_format_version=None, compression_types=["lz4"])
@parametrize(from_kafka_version=str(LATEST_0_10_2), to_message_format_version=str(LATEST_0_9), compression_types=["none"])
@parametrize(from_kafka_version=str(LATEST_0_10_2), to_message_format_version=str(LATEST_0_10), compression_types=["snappy"])
@parametrize(from_kafka_version=str(LATEST_0_10_2), to_message_format_version=None, compression_types=["none"])
@parametrize(from_kafka_version=str(LATEST_0_10_2), to_message_format_version=None, compression_types=["lz4"])
@parametrize(from_kafka_version=str(LATEST_0_10_1), to_message_format_version=None, compression_types=["lz4"])
@parametrize(from_kafka_version=str(LATEST_0_10_1), to_message_format_version=None, compression_types=["snappy"])
@parametrize(from_kafka_version=str(LATEST_0_10_0), to_message_format_version=None, compression_types=["snappy"])
@parametrize(from_kafka_version=str(LATEST_0_10_0), to_message_format_version=None, compression_types=["lz4"])
@cluster(num_nodes=7)
@parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["none"], security_protocol="SASL_SSL")
@cluster(num_nodes=6)
@parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["snappy"])
@parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["lz4"])
@parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=str(LATEST_0_9), compression_types=["none"])
@parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=str(LATEST_0_9), compression_types=["lz4"])
@cluster(num_nodes=7)
@parametrize(from_kafka_version=str(LATEST_0_8_2), to_message_format_version=None, compression_types=["none"])
@parametrize(from_kafka_version=str(LATEST_0_8_2), to_message_format_version=None, compression_types=["snappy"])
def test_upgrade(self, from_kafka_version, to_message_format_version, compression_types,
security_protocol="PLAINTEXT"):
{code}
And their upgrade code is:
{code}
def perform_upgrade(self, from_kafka_version, to_message_format_version=None):
self.logger.info("First pass bounce - rolling upgrade")
for node in self.kafka.nodes:
self.kafka.stop_node(node)
node.version = DEV_BRANCH
node.config[config_property.INTER_BROKER_PROTOCOL_VERSION] = from_kafka_version
node.config[config_property.MESSAGE_FORMAT_VERSION] = from_kafka_version
self.kafka.start_node(node)
self.logger.info("Second pass bounce - remove inter.broker.protocol.version config")
for node in self.kafka.nodes:
self.kafka.stop_node(node)
del node.config[config_property.INTER_BROKER_PROTOCOL_VERSION]
if to_message_format_version is None:
del node.config[config_property.MESSAGE_FORMAT_VERSION]
else:
node.config[config_property.MESSAGE_FORMAT_VERSION] = to_message_format_version
self.kafka.start_node(node)
{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)