You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/08/13 19:33:21 UTC
[kafka] branch trunk updated: MINOR: Add fetch from follower system
test (#7166)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ff9e95c MINOR: Add fetch from follower system test (#7166)
ff9e95c is described below
commit ff9e95cb09907739c17b4f4681b11c525515b995
Author: David Arthur <mu...@gmail.com>
AuthorDate: Tue Aug 13 15:33:05 2019 -0400
MINOR: Add fetch from follower system test (#7166)
This adds a basic system test that enables rack-aware brokers with the rack-aware replica selector for fetch from followers (KIP-392). The test asserts that the follower was read from at least once and that all the messages that were produced were successfully consumed.
Reviewers: Jason Gustafson <ja...@confluent.io>
---
.../tests/core/fetch_from_follower_test.py | 144 +++++++++++++++++++++
1 file changed, 144 insertions(+)
diff --git a/tests/kafkatest/tests/core/fetch_from_follower_test.py b/tests/kafkatest/tests/core/fetch_from_follower_test.py
new file mode 100644
index 0000000..fde1baf
--- /dev/null
+++ b/tests/kafkatest/tests/core/fetch_from_follower_test.py
@@ -0,0 +1,144 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import time
+from collections import defaultdict
+
+from ducktape.mark.resource import cluster
+
+from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.monitor.jmx import JmxMixin
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.utils import is_int
+
+
+class JmxTool(JmxMixin, KafkaPathResolverMixin):
+ """
+ Simple helper class for using the JmxTool directly instead of as a mix-in
+ """
+ def __init__(self, text_context, *args, **kwargs):
+ JmxMixin.__init__(self, num_nodes=1, *args, **kwargs)
+ self.context = text_context
+
+ @property
+ def logger(self):
+ return self.context.logger
+
+
+class FetchFromFollowerTest(ProduceConsumeValidateTest):
+
+ RACK_AWARE_REPLICA_SELECTOR = "org.apache.kafka.common.replica.RackAwareReplicaSelector"
+ METADATA_MAX_AGE_MS = 3000
+
+ def __init__(self, test_context):
+ super(FetchFromFollowerTest, self).__init__(test_context=test_context)
+ self.jmx_tool = JmxTool(test_context, jmx_poll_ms=100)
+ self.topic = "test_topic"
+ self.zk = ZookeeperService(test_context, num_nodes=1)
+ self.kafka = KafkaService(test_context,
+ num_nodes=3,
+ zk=self.zk,
+ topics={
+ self.topic: {
+ "partitions": 1,
+ "replication-factor": 3,
+ "configs": {"min.insync.replicas": 1}},
+ },
+ server_prop_overides=[
+ ["replica.selector.class", self.RACK_AWARE_REPLICA_SELECTOR]
+ ],
+ per_node_server_prop_overrides={
+ 1: [("broker.rack", "rack-a")],
+ 2: [("broker.rack", "rack-b")],
+ 3: [("broker.rack", "rack-c")]
+ })
+
+ self.producer_throughput = 1000
+ self.num_producers = 1
+ self.num_consumers = 1
+
+ def min_cluster_size(self):
+ return super(FetchFromFollowerTest, self).min_cluster_size() + self.num_producers * 2 + self.num_consumers * 2
+
+ def setUp(self):
+ self.zk.start()
+ self.kafka.start()
+
+ @cluster(num_nodes=9)
+ def test_consumer_preferred_read_replica(self):
+ """
+ This test starts up brokers with "broker.rack" and "replica.selector.class" configurations set. The replica
+ selector is set to the rack-aware implementation. One of the brokers has a different rack than the other two.
+ We then use a console consumer with the "client.rack" set to the same value as the differing broker. After
+ producing some records, we verify that the client has been informed of the preferred replica and that all the
+ records are properly consumed.
+ """
+
+ # Find the leader, configure consumer to be on a different rack
+ leader_node = self.kafka.leader(self.topic, 0)
+ leader_idx = self.kafka.idx(leader_node)
+ non_leader_idx = 2 if leader_idx != 2 else 1
+ non_leader_rack = "rack-b" if leader_idx != 2 else "rack-a"
+
+ self.logger.debug("Leader %d %s" % (leader_idx, leader_node))
+ self.logger.debug("Non-Leader %d %s" % (non_leader_idx, non_leader_rack))
+
+ self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic,
+ throughput=self.producer_throughput)
+ self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic,
+ client_id="console-consumer", group_id="test-consumer-group-1",
+ consumer_timeout_ms=60000, message_validator=is_int,
+ consumer_properties={"client.rack": non_leader_rack, "metadata.max.age.ms": self.METADATA_MAX_AGE_MS})
+
+ # Start up and let some data get produced
+ self.start_producer_and_consumer()
+ time.sleep(self.METADATA_MAX_AGE_MS * 2. / 1000)
+
+ consumer_node = self.consumer.nodes[0]
+ consumer_idx = self.consumer.idx(consumer_node)
+ read_replica_attribute = "preferred-read-replica"
+ read_replica_mbean = "kafka.consumer:type=consumer-fetch-manager-metrics,client-id=%s,topic=%s,partition=%d" % \
+ ("console-consumer", self.topic, 0)
+ self.jmx_tool.jmx_object_names = [read_replica_mbean]
+ self.jmx_tool.jmx_attributes = [read_replica_attribute]
+ self.jmx_tool.start_jmx_tool(consumer_idx, consumer_node)
+
+ # Wait for at least one interval of "metadata.max.age.ms"
+ time.sleep(self.METADATA_MAX_AGE_MS * 2. / 1000)
+
+ # Read the JMX output
+ self.jmx_tool.read_jmx_output(consumer_idx, consumer_node)
+
+ all_captured_preferred_read_replicas = defaultdict(int)
+ self.logger.debug(self.jmx_tool.jmx_stats)
+
+ for ts, data in self.jmx_tool.jmx_stats[0].items():
+ for k, v in data.items():
+ if k.endswith(read_replica_attribute):
+ all_captured_preferred_read_replicas[int(v)] += 1
+
+ self.logger.debug("Saw the following preferred read replicas %s",
+ dict(all_captured_preferred_read_replicas.items()))
+
+ assert all_captured_preferred_read_replicas[non_leader_idx] > 0, \
+ "Expected to see broker %d (%s) as a preferred replica" % (non_leader_idx, non_leader_rack)
+
+ # Validate consumed messages
+ self.stop_producer_and_consumer()
+ self.validate()