You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/03/08 08:18:41 UTC
kafka git commit: KAFKA-3201: Added rolling upgrade system tests from
0.8 and 0.9 to 0.10
Repository: kafka
Updated Branches:
refs/heads/trunk 8d0c298c8 -> f6e35dec9
KAFKA-3201: Added rolling upgrade system tests from 0.8 and 0.9 to 0.10
Three main tests:
1. Setup: Producer (0.8) → Kafka Cluster → Consumer (0.8)
First rolling bounce: Set inter.broker.protocol.version = 0.8 and message.format.version = 0.8
Second rolling bonus, use latest (default) inter.broker.protocol.version and message.format.version
2. Setup: Producer (0.9) → Kafka Cluster → Consumer (0.9)
First rolling bounce: Set inter.broker.protocol.version = 0.9 and message.format.version = 0.9
Second rolling bonus, use latest (default) inter.broker.protocol.version and message.format.version
3. Setup: Producer (0.9) → Kafka Cluster → Consumer (0.9)
First rolling bounce: Set inter.broker.protocol.version = 0.9 and message.format.version = 0.9
Second rolling bonus: use inter.broker.protocol.version = 0.10 and message.format.version = 0.9
Plus couple of variations of these tests using old/new consumer and no compression / snappy compression.
Author: Anna Povzner <an...@confluent.io>
Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>
Closes #980 from apovzner/kafka-3201-02
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f6e35dec
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f6e35dec
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f6e35dec
Branch: refs/heads/trunk
Commit: f6e35dec9bf330c3531fd95c6566070d4ddf0457
Parents: 8d0c298
Author: Anna Povzner <an...@confluent.io>
Authored: Mon Mar 7 23:18:17 2016 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Mon Mar 7 23:18:17 2016 -0800
----------------------------------------------------------------------
.../sanity_checks/test_verifiable_producer.py | 8 ++-
tests/kafkatest/services/console_consumer.py | 6 +-
.../kafkatest/services/kafka/config_property.py | 2 +
tests/kafkatest/services/kafka/version.py | 4 ++
.../kafkatest/tests/reassign_partitions_test.py | 2 +-
tests/kafkatest/tests/upgrade_test.py | 70 ++++++++++++++------
vagrant/base.sh | 1 +
7 files changed, 65 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/f6e35dec/tests/kafkatest/sanity_checks/test_verifiable_producer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/sanity_checks/test_verifiable_producer.py b/tests/kafkatest/sanity_checks/test_verifiable_producer.py
index 4155279..e22d422 100644
--- a/tests/kafkatest/sanity_checks/test_verifiable_producer.py
+++ b/tests/kafkatest/sanity_checks/test_verifiable_producer.py
@@ -20,7 +20,7 @@ from ducktape.mark import parametrize
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
-from kafkatest.services.kafka.version import LATEST_0_8_2, TRUNK, KafkaVersion
+from kafkatest.services.kafka.version import LATEST_0_8_2, LATEST_0_9, TRUNK, KafkaVersion
from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.utils import is_version
@@ -45,6 +45,7 @@ class TestVerifiableProducer(Test):
self.kafka.start()
@parametrize(producer_version=str(LATEST_0_8_2))
+ @parametrize(producer_version=str(LATEST_0_9))
@parametrize(producer_version=str(TRUNK))
def test_simple_run(self, producer_version=TRUNK):
"""
@@ -61,7 +62,10 @@ class TestVerifiableProducer(Test):
# that this check works with TRUNK
# When running VerifiableProducer 0.8.X, both trunk version and 0.8.X should show up because of the way
# verifiable producer pulls in some trunk directories into its classpath
- assert is_version(node, [node.version.vstring, TRUNK.vstring])
+ if node.version <= LATEST_0_8_2:
+ assert is_version(node, [node.version.vstring, TRUNK.vstring])
+ else:
+ assert is_version(node, [node.version.vstring])
self.producer.wait()
num_produced = self.producer.num_acked
http://git-wip-us.apache.org/repos/asf/kafka/blob/f6e35dec/tests/kafkatest/services/console_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py
index a3bc2fd..e5f2196 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -17,7 +17,7 @@ from ducktape.utils.util import wait_until
from ducktape.services.background_thread import BackgroundThreadService
from kafkatest.services.kafka.directory import kafka_dir
-from kafkatest.services.kafka.version import TRUNK, LATEST_0_8_2
+from kafkatest.services.kafka.version import TRUNK, LATEST_0_8_2, LATEST_0_9
from kafkatest.services.monitor.jmx import JmxMixin
from kafkatest.services.security.security_config import SecurityConfig
@@ -181,8 +181,8 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService):
if self.print_key:
cmd += " --property print.key=true"
- # LoggingMessageFormatter was introduced in 0.9.0.0
- if node.version > LATEST_0_8_2:
+ # LoggingMessageFormatter was introduced after 0.9
+ if node.version > LATEST_0_9:
cmd+=" --formatter kafka.tools.LoggingMessageFormatter"
cmd += " 2>> %(stderr)s | tee -a %(stdout)s &" % args
http://git-wip-us.apache.org/repos/asf/kafka/blob/f6e35dec/tests/kafkatest/services/kafka/config_property.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/config_property.py b/tests/kafkatest/services/kafka/config_property.py
index cc685aa..b2b1d05 100644
--- a/tests/kafkatest/services/kafka/config_property.py
+++ b/tests/kafkatest/services/kafka/config_property.py
@@ -40,6 +40,8 @@ AUTO_CREATE_TOPICS_ENABLE = "auto.create.topics.enable"
ZOOKEEPER_CONNECT = "zookeeper.connect"
ZOOKEEPER_CONNECTION_TIMEOUT_MS = "zookeeper.connection.timeout.ms"
INTER_BROKER_PROTOCOL_VERSION = "inter.broker.protocol.version"
+MESSAGE_FORMAT_VERSION = "message.format.version"
+
"""
http://git-wip-us.apache.org/repos/asf/kafka/blob/f6e35dec/tests/kafkatest/services/kafka/version.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/version.py b/tests/kafkatest/services/kafka/version.py
index 95f3448..761d91b 100644
--- a/tests/kafkatest/services/kafka/version.py
+++ b/tests/kafkatest/services/kafka/version.py
@@ -59,3 +59,7 @@ V_0_8_2_1 = KafkaVersion("0.8.2.1")
V_0_8_2_2 = KafkaVersion("0.8.2.2")
LATEST_0_8_2 = V_0_8_2_2
+# 0.9.0.X versions
+V_0_9_0_0 = KafkaVersion("0.9.0.0")
+V_0_9_0_1 = KafkaVersion("0.9.0.1")
+LATEST_0_9 = V_0_9_0_1
http://git-wip-us.apache.org/repos/asf/kafka/blob/f6e35dec/tests/kafkatest/tests/reassign_partitions_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/reassign_partitions_test.py b/tests/kafkatest/tests/reassign_partitions_test.py
index fc0b9d6..24ce097 100644
--- a/tests/kafkatest/tests/reassign_partitions_test.py
+++ b/tests/kafkatest/tests/reassign_partitions_test.py
@@ -107,4 +107,4 @@ class ReassignPartitionsTest(ProduceConsumeValidateTest):
self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, new_consumer=new_consumer, consumer_timeout_ms=60000, message_validator=is_int)
self.kafka.start()
- self.run_produce_consume_validate(core_test_action=self.reassign_partitions(bounce_brokers))
+ self.run_produce_consume_validate(core_test_action=lambda: self.reassign_partitions(bounce_brokers))
http://git-wip-us.apache.org/repos/asf/kafka/blob/f6e35dec/tests/kafkatest/tests/upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/upgrade_test.py b/tests/kafkatest/tests/upgrade_test.py
index 20be4f2..bec4b3f 100644
--- a/tests/kafkatest/tests/upgrade_test.py
+++ b/tests/kafkatest/tests/upgrade_test.py
@@ -13,9 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from ducktape.mark import parametrize
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
-from kafkatest.services.kafka.version import LATEST_0_8_2, TRUNK
+from kafkatest.services.kafka.version import LATEST_0_8_2, LATEST_0_9, TRUNK, KafkaVersion
from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.services.kafka import config_property
@@ -31,49 +32,74 @@ class TestUpgrade(ProduceConsumeValidateTest):
def setUp(self):
self.topic = "test_topic"
self.zk = ZookeeperService(self.test_context, num_nodes=1)
- self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=LATEST_0_8_2, topics={self.topic: {
- "partitions": 3,
- "replication-factor": 3,
- 'configs': {"min.insync.replicas": 2}}})
self.zk.start()
- self.kafka.start()
# Producer and consumer
self.producer_throughput = 10000
self.num_producers = 1
self.num_consumers = 1
- self.producer = VerifiableProducer(
- self.test_context, self.num_producers, self.kafka, self.topic,
- throughput=self.producer_throughput, version=LATEST_0_8_2)
-
- # TODO - reduce the timeout
- self.consumer = ConsoleConsumer(
- self.test_context, self.num_consumers, self.kafka, self.topic,
- consumer_timeout_ms=30000, message_validator=is_int, version=LATEST_0_8_2)
- def perform_upgrade(self):
+ def perform_upgrade(self, from_kafka_version, to_message_format_version=None):
self.logger.info("First pass bounce - rolling upgrade")
for node in self.kafka.nodes:
self.kafka.stop_node(node)
node.version = TRUNK
- node.config[config_property.INTER_BROKER_PROTOCOL_VERSION] = "0.8.2.X"
+ node.config[config_property.INTER_BROKER_PROTOCOL_VERSION] = from_kafka_version
+ node.config[config_property.MESSAGE_FORMAT_VERSION] = from_kafka_version
self.kafka.start_node(node)
self.logger.info("Second pass bounce - remove inter.broker.protocol.version config")
for node in self.kafka.nodes:
self.kafka.stop_node(node)
del node.config[config_property.INTER_BROKER_PROTOCOL_VERSION]
+ if to_message_format_version is None:
+ del node.config[config_property.MESSAGE_FORMAT_VERSION]
+ else:
+ node.config[config_property.MESSAGE_FORMAT_VERSION] = to_message_format_version
self.kafka.start_node(node)
- def test_upgrade(self):
- """Test upgrade of Kafka broker cluster from 0.8.2 to 0.9.0
- - Start 3 node broker cluster on version 0.8.2
+ @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["none"])
+ @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["snappy"], new_consumer=True)
+ @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=str(LATEST_0_9), compression_types=["none"])
+ @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=str(LATEST_0_9), compression_types=["snappy"], new_consumer=True)
+ @parametrize(from_kafka_version=str(LATEST_0_8_2), to_message_format_version=None, compression_types=["none"])
+ @parametrize(from_kafka_version=str(LATEST_0_8_2), to_message_format_version=None, compression_types=["snappy"])
+ def test_upgrade(self, from_kafka_version, to_message_format_version, compression_types, new_consumer=False):
+ """Test upgrade of Kafka broker cluster from 0.8.2 or 0.9.0 to 0.10
+
+ from_kafka_version is a Kafka version to upgrade from: either 0.8.2.X or 0.9
+
+ If to_message_format_version is None, it means that we will upgrade to default (latest)
+ message format version. It is possible to upgrade to 0.10 brokers but still use message
+ format version 0.9
+
+ - Start 3 node broker cluster on version 'from_kafka_version'
- Start producer and consumer in the background
- Perform two-phase rolling upgrade
- - First phase: upgrade brokers to 0.9.0 with inter.broker.protocol.version set to 0.8.2.X
- - Second phase: remove inter.broker.protocol.version config with rolling bounce
+ - First phase: upgrade brokers to 0.10 with inter.broker.protocol.version set to
+ from_kafka_version and message.format.version set to from_kafka_version
+ - Second phase: remove inter.broker.protocol.version config with rolling bounce; if
+ to_message_format_version is set to 0.9, set message.format.version to
+ to_message_format_version, otherwise remove message.format.version config
- Finally, validate that every message acked by the producer was consumed by the consumer
"""
+ self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk,
+ version=KafkaVersion(from_kafka_version),
+ topics={self.topic: {"partitions": 3, "replication-factor": 3,
+ 'configs': {"min.insync.replicas": 2}}})
+ self.kafka.start()
+
+ self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
+ self.topic, throughput=self.producer_throughput,
+ message_validator=is_int,
+ compression_types=compression_types,
+ version=KafkaVersion(from_kafka_version))
+
+ # TODO - reduce the timeout
+ self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka,
+ self.topic, consumer_timeout_ms=30000, new_consumer=new_consumer,
+ message_validator=is_int, version=KafkaVersion(from_kafka_version))
- self.run_produce_consume_validate(core_test_action=self.perform_upgrade)
+ self.run_produce_consume_validate(core_test_action=lambda: self.perform_upgrade(from_kafka_version,
+ to_message_format_version))
http://git-wip-us.apache.org/repos/asf/kafka/blob/f6e35dec/vagrant/base.sh
----------------------------------------------------------------------
diff --git a/vagrant/base.sh b/vagrant/base.sh
index 8e1e1c6..d271f87 100644
--- a/vagrant/base.sh
+++ b/vagrant/base.sh
@@ -63,6 +63,7 @@ get_kafka() {
}
get_kafka 0.8.2.2
+get_kafka 0.9.0.1
# For EC2 nodes, we want to use /mnt, which should have the local disk. On local
# VMs, we can just create it if it doesn't exist and use it like we'd use