You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/19 02:13:25 UTC

kafka git commit: KAFKA-2845: new client old broker compatibility

Repository: kafka
Updated Branches:
  refs/heads/trunk 2e91806db -> f154956a7


KAFKA-2845: new client old broker compatibility

Author: Geoff Anderson <ge...@confluent.io>

Reviewers: Ismael Juma, Guozhang Wang

Closes #537 from granders/KAFKA-2845-new-client-old-broker-compatibility


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f154956a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f154956a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f154956a

Branch: refs/heads/trunk
Commit: f154956a76790220a9ecb84d88644076c6885683
Parents: 2e91806
Author: Geoff Anderson <ge...@confluent.io>
Authored: Wed Nov 18 17:13:21 2015 -0800
Committer: Confluent <co...@Confluents-MacBook-Pro.local>
Committed: Wed Nov 18 17:13:21 2015 -0800

----------------------------------------------------------------------
 tests/kafkatest/services/console_consumer.py |   7 +-
 tests/kafkatest/tests/compatibility_test.py  | 101 ++++++++++++++++++++++
 tests/kafkatest/tests/upgrade_test.py        |   3 -
 3 files changed, 105 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f154956a/tests/kafkatest/services/console_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py
index 84d358d..e42b20e 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -99,9 +99,9 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService):
             "collect_default": True}
     }
 
-    def __init__(self, context, num_nodes, kafka, topic, new_consumer=False, message_validator=None,
-                 from_beginning=True, consumer_timeout_ms=None, version=TRUNK, client_id="console-consumer",
-                 print_key=False, jmx_object_names=None, jmx_attributes=[]):
+    def __init__(self, context, num_nodes, kafka, topic, group_id="test-consumer-group", new_consumer=False,
+                 message_validator=None, from_beginning=True, consumer_timeout_ms=None, version=TRUNK,
+                 client_id="console-consumer", print_key=False, jmx_object_names=None, jmx_attributes=[]):
         """
         Args:
             context:                    standard context
@@ -121,6 +121,7 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService):
         BackgroundThreadService.__init__(self, context, num_nodes)
         self.kafka = kafka
         self.new_consumer = new_consumer
+        self.group_id = group_id
         self.args = {
             'topic': topic,
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f154956a/tests/kafkatest/tests/compatibility_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/compatibility_test.py b/tests/kafkatest/tests/compatibility_test.py
new file mode 100644
index 0000000..0310d2f
--- /dev/null
+++ b/tests/kafkatest/tests/compatibility_test.py
@@ -0,0 +1,101 @@
+# Copyright 2015 Confluent Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.tests.test import Test
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka.version import LATEST_0_8_2, TRUNK
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.console_consumer import ConsoleConsumer, is_int
+
+
+class ClientCompatibilityTest(Test):
+
+    def __init__(self, test_context):
+        super(ClientCompatibilityTest, self).__init__(test_context=test_context)
+
+    def setUp(self):
+        self.topic = "test_topic"
+        self.zk = ZookeeperService(self.test_context, num_nodes=1)
+        self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=LATEST_0_8_2, topics={self.topic: {
+                                                                    "partitions": 3,
+                                                                    "replication-factor": 3,
+                                                                    "min.insync.replicas": 2}})
+        self.zk.start()
+        self.kafka.start()
+
+        # Producer and consumer
+        self.producer_throughput = 10000
+        self.num_producers = 1
+        self.num_consumers = 1
+
+    def test_producer_back_compatibility(self):
+        """Run 0.9.X java producer against 0.8.X brokers.
+        This test documents the fact that java producer v0.9.0.0 and later won't run against 0.8.X brokers
+        the broker responds to a V1 produce request with a V0 fetch response; the client then tries to parse this V0
+        produce response as a V1 produce response, resulting in a BufferUnderflowException
+        """
+        self.producer = VerifiableProducer(
+            self.test_context, self.num_producers, self.kafka, self.topic, max_messages=100,
+            throughput=self.producer_throughput, version=TRUNK)
+
+        node = self.producer.nodes[0]
+        try:
+            self.producer.start()
+            self.producer.wait()
+            raise Exception("0.9.X java producer should not run successfully against 0.8.X broker")
+        except:
+            # Expected
+            pass
+        finally:
+            self.producer.kill_node(node, clean_shutdown=False)
+
+        self.logger.info("Grepping producer log for expected error type")
+        node.account.ssh("egrep -m 1 %s %s" % ("\"org\.apache\.kafka\.common\.protocol\.types\.SchemaException.*throttle_time_ms.*: java\.nio\.BufferUnderflowException\"", self.producer.LOG_FILE), allow_fail=False)
+
+    def test_consumer_back_compatibility(self):
+        """Run the scala 0.8.X consumer against an 0.9.X cluster.
+        Expect 0.8.X scala consumer to fail with buffer underflow. This error is the same as when an 0.9.X producer
+        is run against an 0.8.X broker: the broker responds to a V1 fetch request with a V0 fetch response; the
+        client then tries to parse this V0 fetch response as a V1 fetch response, resulting in a BufferUnderflowException
+        """
+        num_messages = 10
+        self.producer = VerifiableProducer(
+            self.test_context, self.num_producers, self.kafka, self.topic, max_messages=num_messages,
+            throughput=self.producer_throughput, version=LATEST_0_8_2)
+
+        self.consumer = ConsoleConsumer(
+            self.test_context, self.num_consumers, self.kafka, self.topic, group_id="consumer-09X",
+            consumer_timeout_ms=10000, message_validator=is_int, version=TRUNK)
+
+        self.old_consumer = ConsoleConsumer(
+            self.test_context, self.num_consumers, self.kafka, self.topic, group_id="consumer-08X",
+            consumer_timeout_ms=10000, message_validator=is_int, version=LATEST_0_8_2)
+
+        self.producer.run()
+        self.consumer.run()
+        self.old_consumer.run()
+
+        consumed = len(self.consumer.messages_consumed[1])
+        old_consumed = len(self.old_consumer.messages_consumed[1])
+        assert old_consumed == num_messages, "Expected 0.8.X scala consumer to consume %d, but only got %d" % (num_messages, old_consumed)
+        assert consumed == 0, "Expected 0.9.X scala consumer to fail to consume any messages, but got %d" % consumed
+
+        self.logger.info("Grepping consumer log for expected error type")
+        node = self.consumer.nodes[0]
+        node.account.ssh("egrep -m 1 %s %s" % ("\"java\.nio\.BufferUnderflowException\"", self.consumer.LOG_FILE), allow_fail=False)
+
+
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/f154956a/tests/kafkatest/tests/upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/upgrade_test.py b/tests/kafkatest/tests/upgrade_test.py
index 97605cd..245129a 100644
--- a/tests/kafkatest/tests/upgrade_test.py
+++ b/tests/kafkatest/tests/upgrade_test.py
@@ -13,7 +13,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.kafka import KafkaService
 from kafkatest.services.kafka.version import LATEST_0_8_2, TRUNK
@@ -77,5 +76,3 @@ class TestUpgrade(ProduceConsumeValidateTest):
         """
 
         self.run_produce_consume_validate(core_test_action=self.perform_upgrade)
-
-