You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/05/11 22:48:57 UTC

kafka git commit: KAFKA-3694; Ensure broker Zk deregistration prior to restart in ReplicationTest

Repository: kafka
Updated Branches:
  refs/heads/trunk b28bc57a1 -> f892f0ca6


KAFKA-3694; Ensure broker Zk deregistration prior to restart in ReplicationTest

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Geoff Anderson <ge...@confluent.io>, Ismael Juma <is...@juma.me.uk>

Closes #1365 from hachikuji/KAFKA-3694


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f892f0ca
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f892f0ca
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f892f0ca

Branch: refs/heads/trunk
Commit: f892f0ca6d38cb21a93c2c05dd8b9a23c4165181
Parents: b28bc57
Author: Jason Gustafson <ja...@confluent.io>
Authored: Wed May 11 23:48:46 2016 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Wed May 11 23:48:46 2016 +0100

----------------------------------------------------------------------
 tests/kafkatest/services/kafka/kafka.py             | 16 +++++++++++++++-
 .../services/kafka/templates/kafka.properties       |  1 +
 tests/kafkatest/tests/core/replication_test.py      | 11 ++++-------
 tests/kafkatest/tests/produce_consume_validate.py   |  2 +-
 4 files changed, 21 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f892f0ca/tests/kafkatest/services/kafka/kafka.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index 334069d..a843a12 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -66,7 +66,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
     def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAINTEXT, interbroker_security_protocol=SecurityConfig.PLAINTEXT,
                  client_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI, interbroker_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI,
                  authorizer_class_name=None, topics=None, version=TRUNK, quota_config=None, jmx_object_names=None,
-                 jmx_attributes=[], zk_connect_timeout=5000):
+                 jmx_attributes=[], zk_connect_timeout=5000, zk_session_timeout=6000):
         """
         :type context
         :type zk: ZookeeperService
@@ -99,6 +99,11 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         # for this constructor.
         self.zk_connect_timeout = zk_connect_timeout
 
+        # Also allow the session timeout to be provided explicitly,
+        # primarily so that test cases can depend on it when waiting
+        # e.g. brokers to deregister after a hard kill.
+        self.zk_session_timeout = zk_session_timeout
+
         self.port_mappings = {
             'PLAINTEXT': Port('PLAINTEXT', 9092, False),
             'SSL': Port('SSL', 9093, False),
@@ -513,6 +518,15 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         self.logger.info("Controller's ID: %d" % (controller_idx))
         return self.get_node(controller_idx)
 
+    def is_registered(self, node):
+        """
+        Check whether a broker is registered in Zookeeper
+        """
+        self.logger.debug("Querying zookeeper to see if broker %s is registered", node)
+        broker_info = self.zk.query("/brokers/ids/%s" % self.idx(node))
+        self.logger.debug("Broker info: %s", broker_info)
+        return broker_info is not None
+
     def get_offset_shell(self, topic, partitions, max_wait_ms, offsets, time):
         node = self.nodes[0]
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f892f0ca/tests/kafkatest/services/kafka/templates/kafka.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties b/tests/kafkatest/services/kafka/templates/kafka.properties
index 1e4f17c..1f23713 100644
--- a/tests/kafkatest/services/kafka/templates/kafka.properties
+++ b/tests/kafkatest/services/kafka/templates/kafka.properties
@@ -72,6 +72,7 @@ zookeeper.set.acl={{zk_set_acl}}
 {% endif %}
 
 zookeeper.connection.timeout.ms={{ zk_connect_timeout }}
+zookeeper.session.timeout.ms={{ zk_session_timeout }}
 
 {% if replica_lag is defined %}
 replica.lag.time.max.ms={{replica_lag}}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f892f0ca/tests/kafkatest/tests/core/replication_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/replication_test.py b/tests/kafkatest/tests/core/replication_test.py
index 8e9474a..f815034 100644
--- a/tests/kafkatest/tests/core/replication_test.py
+++ b/tests/kafkatest/tests/core/replication_test.py
@@ -65,15 +65,12 @@ def hard_bounce(test, broker_type):
         test.kafka.signal_node(prev_broker_node, sig=signal.SIGKILL)
 
         # Since this is a hard kill, we need to make sure the process is down and that
-        # zookeeper and the broker cluster have registered the loss of the leader/controller.
-        # Waiting for a new leader for the topic-partition/controller to be elected is a reasonable heuristic for this.
+        # zookeeper has registered the loss by expiring the broker's session timeout.
 
-        def role_reassigned():
-            current_elected_broker = broker_node(test, broker_type)
-            return current_elected_broker is not None and current_elected_broker != prev_broker_node
+        wait_until(lambda: len(test.kafka.pids(prev_broker_node)) == 0 and not test.kafka.is_registered(prev_broker_node),
+                   timeout_sec=test.kafka.zk_session_timeout + 5,
+                   err_msg="Failed to see timely deregistration of hard-killed broker %s" % str(prev_broker_node.account))
 
-        wait_until(lambda: len(test.kafka.pids(prev_broker_node)) == 0, timeout_sec=5)
-        wait_until(role_reassigned, timeout_sec=10, backoff_sec=.5)
         test.kafka.start_node(prev_broker_node)
 
 failures = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/f892f0ca/tests/kafkatest/tests/produce_consume_validate.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/produce_consume_validate.py b/tests/kafkatest/tests/produce_consume_validate.py
index 425b816..a5da7be 100644
--- a/tests/kafkatest/tests/produce_consume_validate.py
+++ b/tests/kafkatest/tests/produce_consume_validate.py
@@ -35,7 +35,7 @@ class ProduceConsumeValidateTest(Test):
     def start_producer_and_consumer(self):
         # Start background producer and consumer
         self.producer.start()
-        wait_until(lambda: self.producer.num_acked > 5, timeout_sec=10,
+        wait_until(lambda: self.producer.num_acked > 5, timeout_sec=20,
              err_msg="Producer failed to start in a reasonable amount of time.")
         self.consumer.start()
         wait_until(lambda: len(self.consumer.messages_consumed[1]) > 0, timeout_sec=60,