You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/19 02:13:25 UTC
kafka git commit: KAFKA-2845: new client old broker compatibility
Repository: kafka
Updated Branches:
refs/heads/trunk 2e91806db -> f154956a7
KAFKA-2845: new client old broker compatibility
Author: Geoff Anderson <ge...@confluent.io>
Reviewers: Ismael Juma, Guozhang Wang
Closes #537 from granders/KAFKA-2845-new-client-old-broker-compatibility
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f154956a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f154956a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f154956a
Branch: refs/heads/trunk
Commit: f154956a76790220a9ecb84d88644076c6885683
Parents: 2e91806
Author: Geoff Anderson <ge...@confluent.io>
Authored: Wed Nov 18 17:13:21 2015 -0800
Committer: Confluent <co...@Confluents-MacBook-Pro.local>
Committed: Wed Nov 18 17:13:21 2015 -0800
----------------------------------------------------------------------
tests/kafkatest/services/console_consumer.py | 7 +-
tests/kafkatest/tests/compatibility_test.py | 101 ++++++++++++++++++++++
tests/kafkatest/tests/upgrade_test.py | 3 -
3 files changed, 105 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/f154956a/tests/kafkatest/services/console_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py
index 84d358d..e42b20e 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -99,9 +99,9 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService):
"collect_default": True}
}
- def __init__(self, context, num_nodes, kafka, topic, new_consumer=False, message_validator=None,
- from_beginning=True, consumer_timeout_ms=None, version=TRUNK, client_id="console-consumer",
- print_key=False, jmx_object_names=None, jmx_attributes=[]):
+ def __init__(self, context, num_nodes, kafka, topic, group_id="test-consumer-group", new_consumer=False,
+ message_validator=None, from_beginning=True, consumer_timeout_ms=None, version=TRUNK,
+ client_id="console-consumer", print_key=False, jmx_object_names=None, jmx_attributes=[]):
"""
Args:
context: standard context
@@ -121,6 +121,7 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService):
BackgroundThreadService.__init__(self, context, num_nodes)
self.kafka = kafka
self.new_consumer = new_consumer
+ self.group_id = group_id
self.args = {
'topic': topic,
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f154956a/tests/kafkatest/tests/compatibility_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/compatibility_test.py b/tests/kafkatest/tests/compatibility_test.py
new file mode 100644
index 0000000..0310d2f
--- /dev/null
+++ b/tests/kafkatest/tests/compatibility_test.py
@@ -0,0 +1,101 @@
+# Copyright 2015 Confluent Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.tests.test import Test
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka.version import LATEST_0_8_2, TRUNK
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.console_consumer import ConsoleConsumer, is_int
+
+
+class ClientCompatibilityTest(Test):
+
+ def __init__(self, test_context):
+ super(ClientCompatibilityTest, self).__init__(test_context=test_context)
+
+ def setUp(self):
+ self.topic = "test_topic"
+ self.zk = ZookeeperService(self.test_context, num_nodes=1)
+ self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=LATEST_0_8_2, topics={self.topic: {
+ "partitions": 3,
+ "replication-factor": 3,
+ "min.insync.replicas": 2}})
+ self.zk.start()
+ self.kafka.start()
+
+ # Producer and consumer
+ self.producer_throughput = 10000
+ self.num_producers = 1
+ self.num_consumers = 1
+
+ def test_producer_back_compatibility(self):
+ """Run 0.9.X java producer against 0.8.X brokers.
+ This test documents the fact that java producer v0.9.0.0 and later won't run against 0.8.X brokers
+ the broker responds to a V1 produce request with a V0 fetch response; the client then tries to parse this V0
+ produce response as a V1 produce response, resulting in a BufferUnderflowException
+ """
+ self.producer = VerifiableProducer(
+ self.test_context, self.num_producers, self.kafka, self.topic, max_messages=100,
+ throughput=self.producer_throughput, version=TRUNK)
+
+ node = self.producer.nodes[0]
+ try:
+ self.producer.start()
+ self.producer.wait()
+ raise Exception("0.9.X java producer should not run successfully against 0.8.X broker")
+ except:
+ # Expected
+ pass
+ finally:
+ self.producer.kill_node(node, clean_shutdown=False)
+
+ self.logger.info("Grepping producer log for expected error type")
+ node.account.ssh("egrep -m 1 %s %s" % ("\"org\.apache\.kafka\.common\.protocol\.types\.SchemaException.*throttle_time_ms.*: java\.nio\.BufferUnderflowException\"", self.producer.LOG_FILE), allow_fail=False)
+
+ def test_consumer_back_compatibility(self):
+ """Run the scala 0.8.X consumer against an 0.9.X cluster.
+ Expect 0.8.X scala consumer to fail with buffer underflow. This error is the same as when an 0.9.X producer
+ is run against an 0.8.X broker: the broker responds to a V1 fetch request with a V0 fetch response; the
+ client then tries to parse this V0 fetch response as a V1 fetch response, resulting in a BufferUnderflowException
+ """
+ num_messages = 10
+ self.producer = VerifiableProducer(
+ self.test_context, self.num_producers, self.kafka, self.topic, max_messages=num_messages,
+ throughput=self.producer_throughput, version=LATEST_0_8_2)
+
+ self.consumer = ConsoleConsumer(
+ self.test_context, self.num_consumers, self.kafka, self.topic, group_id="consumer-09X",
+ consumer_timeout_ms=10000, message_validator=is_int, version=TRUNK)
+
+ self.old_consumer = ConsoleConsumer(
+ self.test_context, self.num_consumers, self.kafka, self.topic, group_id="consumer-08X",
+ consumer_timeout_ms=10000, message_validator=is_int, version=LATEST_0_8_2)
+
+ self.producer.run()
+ self.consumer.run()
+ self.old_consumer.run()
+
+ consumed = len(self.consumer.messages_consumed[1])
+ old_consumed = len(self.old_consumer.messages_consumed[1])
+ assert old_consumed == num_messages, "Expected 0.8.X scala consumer to consume %d, but only got %d" % (num_messages, old_consumed)
+ assert consumed == 0, "Expected 0.9.X scala consumer to fail to consume any messages, but got %d" % consumed
+
+ self.logger.info("Grepping consumer log for expected error type")
+ node = self.consumer.nodes[0]
+ node.account.ssh("egrep -m 1 %s %s" % ("\"java\.nio\.BufferUnderflowException\"", self.consumer.LOG_FILE), allow_fail=False)
+
+
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/f154956a/tests/kafkatest/tests/upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/upgrade_test.py b/tests/kafkatest/tests/upgrade_test.py
index 97605cd..245129a 100644
--- a/tests/kafkatest/tests/upgrade_test.py
+++ b/tests/kafkatest/tests/upgrade_test.py
@@ -13,7 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
from kafkatest.services.kafka.version import LATEST_0_8_2, TRUNK
@@ -77,5 +76,3 @@ class TestUpgrade(ProduceConsumeValidateTest):
"""
self.run_produce_consume_validate(core_test_action=self.perform_upgrade)
-
-