You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/03/08 08:18:41 UTC

kafka git commit: KAFKA-3201: Added rolling upgrade system tests from 0.8 and 0.9 to 0.10

Repository: kafka
Updated Branches:
  refs/heads/trunk 8d0c298c8 -> f6e35dec9


KAFKA-3201: Added rolling upgrade system tests from 0.8 and 0.9 to 0.10

Three main tests:
1. Setup: Producer (0.8) → Kafka Cluster → Consumer (0.8)
First rolling bounce: Set inter.broker.protocol.version = 0.8 and message.format.version = 0.8
Second rolling bonus, use latest (default) inter.broker.protocol.version and message.format.version
2. Setup: Producer (0.9) → Kafka Cluster → Consumer (0.9)
First rolling bounce: Set inter.broker.protocol.version = 0.9 and message.format.version = 0.9
Second rolling bonus, use latest (default) inter.broker.protocol.version and message.format.version
3. Setup: Producer (0.9) → Kafka Cluster → Consumer (0.9)
First rolling bounce: Set inter.broker.protocol.version = 0.9 and message.format.version = 0.9
Second rolling bonus: use inter.broker.protocol.version = 0.10 and message.format.version = 0.9

Plus couple of variations of these tests using old/new consumer and no compression / snappy compression.

Author: Anna Povzner <an...@confluent.io>

Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>

Closes #980 from apovzner/kafka-3201-02


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f6e35dec
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f6e35dec
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f6e35dec

Branch: refs/heads/trunk
Commit: f6e35dec9bf330c3531fd95c6566070d4ddf0457
Parents: 8d0c298
Author: Anna Povzner <an...@confluent.io>
Authored: Mon Mar 7 23:18:17 2016 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Mon Mar 7 23:18:17 2016 -0800

----------------------------------------------------------------------
 .../sanity_checks/test_verifiable_producer.py   |  8 ++-
 tests/kafkatest/services/console_consumer.py    |  6 +-
 .../kafkatest/services/kafka/config_property.py |  2 +
 tests/kafkatest/services/kafka/version.py       |  4 ++
 .../kafkatest/tests/reassign_partitions_test.py |  2 +-
 tests/kafkatest/tests/upgrade_test.py           | 70 ++++++++++++++------
 vagrant/base.sh                                 |  1 +
 7 files changed, 65 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f6e35dec/tests/kafkatest/sanity_checks/test_verifiable_producer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/sanity_checks/test_verifiable_producer.py b/tests/kafkatest/sanity_checks/test_verifiable_producer.py
index 4155279..e22d422 100644
--- a/tests/kafkatest/sanity_checks/test_verifiable_producer.py
+++ b/tests/kafkatest/sanity_checks/test_verifiable_producer.py
@@ -20,7 +20,7 @@ from ducktape.mark import parametrize
 
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.kafka import KafkaService
-from kafkatest.services.kafka.version import LATEST_0_8_2, TRUNK, KafkaVersion
+from kafkatest.services.kafka.version import LATEST_0_8_2, LATEST_0_9, TRUNK, KafkaVersion
 from kafkatest.services.verifiable_producer import VerifiableProducer
 from kafkatest.utils import is_version
 
@@ -45,6 +45,7 @@ class TestVerifiableProducer(Test):
         self.kafka.start()
 
     @parametrize(producer_version=str(LATEST_0_8_2))
+    @parametrize(producer_version=str(LATEST_0_9))
     @parametrize(producer_version=str(TRUNK))
     def test_simple_run(self, producer_version=TRUNK):
         """
@@ -61,7 +62,10 @@ class TestVerifiableProducer(Test):
         # that this check works with TRUNK
         # When running VerifiableProducer 0.8.X, both trunk version and 0.8.X should show up because of the way
         # verifiable producer pulls in some trunk directories into its classpath
-        assert is_version(node, [node.version.vstring, TRUNK.vstring])
+        if node.version <= LATEST_0_8_2:
+            assert is_version(node, [node.version.vstring, TRUNK.vstring])
+        else:
+            assert is_version(node, [node.version.vstring])
 
         self.producer.wait()
         num_produced = self.producer.num_acked

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6e35dec/tests/kafkatest/services/console_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py
index a3bc2fd..e5f2196 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -17,7 +17,7 @@ from ducktape.utils.util import wait_until
 from ducktape.services.background_thread import BackgroundThreadService
 
 from kafkatest.services.kafka.directory import kafka_dir
-from kafkatest.services.kafka.version import TRUNK, LATEST_0_8_2
+from kafkatest.services.kafka.version import TRUNK, LATEST_0_8_2, LATEST_0_9
 from kafkatest.services.monitor.jmx import JmxMixin
 from kafkatest.services.security.security_config import SecurityConfig
 
@@ -181,8 +181,8 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService):
         if self.print_key:
             cmd += " --property print.key=true"
 
-        # LoggingMessageFormatter was introduced in 0.9.0.0
-        if node.version > LATEST_0_8_2:
+        # LoggingMessageFormatter was introduced after 0.9
+        if node.version > LATEST_0_9:
             cmd+=" --formatter kafka.tools.LoggingMessageFormatter"
 
         cmd += " 2>> %(stderr)s | tee -a %(stdout)s &" % args

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6e35dec/tests/kafkatest/services/kafka/config_property.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/config_property.py b/tests/kafkatest/services/kafka/config_property.py
index cc685aa..b2b1d05 100644
--- a/tests/kafkatest/services/kafka/config_property.py
+++ b/tests/kafkatest/services/kafka/config_property.py
@@ -40,6 +40,8 @@ AUTO_CREATE_TOPICS_ENABLE = "auto.create.topics.enable"
 ZOOKEEPER_CONNECT = "zookeeper.connect"
 ZOOKEEPER_CONNECTION_TIMEOUT_MS = "zookeeper.connection.timeout.ms"
 INTER_BROKER_PROTOCOL_VERSION = "inter.broker.protocol.version"
+MESSAGE_FORMAT_VERSION = "message.format.version"
+
 
 
 """

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6e35dec/tests/kafkatest/services/kafka/version.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/version.py b/tests/kafkatest/services/kafka/version.py
index 95f3448..761d91b 100644
--- a/tests/kafkatest/services/kafka/version.py
+++ b/tests/kafkatest/services/kafka/version.py
@@ -59,3 +59,7 @@ V_0_8_2_1 = KafkaVersion("0.8.2.1")
 V_0_8_2_2 = KafkaVersion("0.8.2.2")
 LATEST_0_8_2 = V_0_8_2_2
 
+# 0.9.0.X versions
+V_0_9_0_0 = KafkaVersion("0.9.0.0")
+V_0_9_0_1 = KafkaVersion("0.9.0.1")
+LATEST_0_9 = V_0_9_0_1

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6e35dec/tests/kafkatest/tests/reassign_partitions_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/reassign_partitions_test.py b/tests/kafkatest/tests/reassign_partitions_test.py
index fc0b9d6..24ce097 100644
--- a/tests/kafkatest/tests/reassign_partitions_test.py
+++ b/tests/kafkatest/tests/reassign_partitions_test.py
@@ -107,4 +107,4 @@ class ReassignPartitionsTest(ProduceConsumeValidateTest):
         self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, new_consumer=new_consumer, consumer_timeout_ms=60000, message_validator=is_int)
         self.kafka.start()
         
-        self.run_produce_consume_validate(core_test_action=self.reassign_partitions(bounce_brokers))
+        self.run_produce_consume_validate(core_test_action=lambda: self.reassign_partitions(bounce_brokers))

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6e35dec/tests/kafkatest/tests/upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/upgrade_test.py b/tests/kafkatest/tests/upgrade_test.py
index 20be4f2..bec4b3f 100644
--- a/tests/kafkatest/tests/upgrade_test.py
+++ b/tests/kafkatest/tests/upgrade_test.py
@@ -13,9 +13,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from ducktape.mark import parametrize
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.kafka import KafkaService
-from kafkatest.services.kafka.version import LATEST_0_8_2, TRUNK
+from kafkatest.services.kafka.version import LATEST_0_8_2, LATEST_0_9, TRUNK, KafkaVersion
 from kafkatest.services.verifiable_producer import VerifiableProducer
 from kafkatest.services.console_consumer import ConsoleConsumer
 from kafkatest.services.kafka import config_property
@@ -31,49 +32,74 @@ class TestUpgrade(ProduceConsumeValidateTest):
     def setUp(self):
         self.topic = "test_topic"
         self.zk = ZookeeperService(self.test_context, num_nodes=1)
-        self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=LATEST_0_8_2, topics={self.topic: {
-                                                                    "partitions": 3,
-                                                                    "replication-factor": 3,
-                                                                    'configs': {"min.insync.replicas": 2}}})
         self.zk.start()
-        self.kafka.start()
 
         # Producer and consumer
         self.producer_throughput = 10000
         self.num_producers = 1
         self.num_consumers = 1
-        self.producer = VerifiableProducer(
-            self.test_context, self.num_producers, self.kafka, self.topic,
-            throughput=self.producer_throughput, version=LATEST_0_8_2)
-
-        # TODO - reduce the timeout
-        self.consumer = ConsoleConsumer(
-            self.test_context, self.num_consumers, self.kafka, self.topic,
-            consumer_timeout_ms=30000, message_validator=is_int, version=LATEST_0_8_2)
 
-    def perform_upgrade(self):
+    def perform_upgrade(self, from_kafka_version, to_message_format_version=None):
         self.logger.info("First pass bounce - rolling upgrade")
         for node in self.kafka.nodes:
             self.kafka.stop_node(node)
             node.version = TRUNK
-            node.config[config_property.INTER_BROKER_PROTOCOL_VERSION] = "0.8.2.X"
+            node.config[config_property.INTER_BROKER_PROTOCOL_VERSION] = from_kafka_version
+            node.config[config_property.MESSAGE_FORMAT_VERSION] = from_kafka_version
             self.kafka.start_node(node)
 
         self.logger.info("Second pass bounce - remove inter.broker.protocol.version config")
         for node in self.kafka.nodes:
             self.kafka.stop_node(node)
             del node.config[config_property.INTER_BROKER_PROTOCOL_VERSION]
+            if to_message_format_version is None:
+                del node.config[config_property.MESSAGE_FORMAT_VERSION]
+            else:
+                node.config[config_property.MESSAGE_FORMAT_VERSION] = to_message_format_version
             self.kafka.start_node(node)
 
-    def test_upgrade(self):
-        """Test upgrade of Kafka broker cluster from 0.8.2 to 0.9.0
 
-        - Start 3 node broker cluster on version 0.8.2
+    @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["none"])
+    @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["snappy"], new_consumer=True)
+    @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=str(LATEST_0_9), compression_types=["none"])
+    @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=str(LATEST_0_9), compression_types=["snappy"], new_consumer=True)
+    @parametrize(from_kafka_version=str(LATEST_0_8_2), to_message_format_version=None, compression_types=["none"])
+    @parametrize(from_kafka_version=str(LATEST_0_8_2), to_message_format_version=None, compression_types=["snappy"])
+    def test_upgrade(self, from_kafka_version, to_message_format_version, compression_types, new_consumer=False):
+        """Test upgrade of Kafka broker cluster from 0.8.2 or 0.9.0 to 0.10
+
+        from_kafka_version is a Kafka version to upgrade from: either 0.8.2.X or 0.9
+
+        If to_message_format_version is None, it means that we will upgrade to default (latest)
+        message format version. It is possible to upgrade to 0.10 brokers but still use message
+        format version 0.9
+
+        - Start 3 node broker cluster on version 'from_kafka_version'
         - Start producer and consumer in the background
         - Perform two-phase rolling upgrade
-            - First phase: upgrade brokers to 0.9.0 with inter.broker.protocol.version set to 0.8.2.X
-            - Second phase: remove inter.broker.protocol.version config with rolling bounce
+            - First phase: upgrade brokers to 0.10 with inter.broker.protocol.version set to
+            from_kafka_version and message.format.version set to from_kafka_version
+            - Second phase: remove inter.broker.protocol.version config with rolling bounce; if
+            to_message_format_version is set to 0.9, set message.format.version to
+            to_message_format_version, otherwise remove message.format.version config
         - Finally, validate that every message acked by the producer was consumed by the consumer
         """
+        self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk,
+                                  version=KafkaVersion(from_kafka_version),
+                                  topics={self.topic: {"partitions": 3, "replication-factor": 3,
+                                                       'configs': {"min.insync.replicas": 2}}})
+        self.kafka.start()
+
+        self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
+                                           self.topic, throughput=self.producer_throughput,
+                                           message_validator=is_int,
+                                           compression_types=compression_types,
+                                           version=KafkaVersion(from_kafka_version))
+
+        # TODO - reduce the timeout
+        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka,
+                                        self.topic, consumer_timeout_ms=30000, new_consumer=new_consumer,
+                                        message_validator=is_int, version=KafkaVersion(from_kafka_version))
 
-        self.run_produce_consume_validate(core_test_action=self.perform_upgrade)
+        self.run_produce_consume_validate(core_test_action=lambda: self.perform_upgrade(from_kafka_version,
+                                                                                        to_message_format_version))

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6e35dec/vagrant/base.sh
----------------------------------------------------------------------
diff --git a/vagrant/base.sh b/vagrant/base.sh
index 8e1e1c6..d271f87 100644
--- a/vagrant/base.sh
+++ b/vagrant/base.sh
@@ -63,6 +63,7 @@ get_kafka() {
 }
 
 get_kafka 0.8.2.2
+get_kafka 0.9.0.1
 
 # For EC2 nodes, we want to use /mnt, which should have the local disk. On local
 # VMs, we can just create it if it doesn't exist and use it like we'd use