You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/03/06 19:19:46 UTC

[kafka] branch trunk updated: MINOR: Do not start processor for bounce-at-start (#4639)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e5d6c9a  MINOR: Do not start processor for bounce-at-start (#4639)
e5d6c9a is described below

commit e5d6c9a79a4ca9b82502b8e7f503d86ddaddb7fb
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Tue Mar 6 11:19:38 2018 -0800

    MINOR: Do not start processor for bounce-at-start (#4639)
    
    Only start it after the broker has been shutdown.
---
 .../kafka/common/requests/MetadataResponse.java      |  8 ++++++++
 .../tests/streams/streams_broker_bounce_test.py      | 20 ++++++++++++--------
 2 files changed, 20 insertions(+), 8 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index cda3c07..de7c8f6 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -431,6 +431,14 @@ public class MetadataResponse extends AbstractResponse {
             return partitionMetadata;
         }
 
+        @Override
+        public String toString() {
+            return "(type=TopicMetadata" +
+                    ", error=" + error +
+                    ", topic=" + topic +
+                    ", isInternal=" + isInternal +
+                    ", partitionMetadata=" + partitionMetadata + ')';
+        }
     }
 
     // This is used to describe per-partition state in the MetadataResponse
diff --git a/tests/kafkatest/tests/streams/streams_broker_bounce_test.py b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
index 15d67b9..1415ecc 100644
--- a/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
+++ b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
@@ -112,6 +112,8 @@ class StreamsBrokerBounceTest(Test):
             'wcnt' : { 'partitions': self.partitions, 'replication-factor': self.replication,
                        'configs': {"min.insync.replicas": 2} },
             'tagg' : { 'partitions': self.partitions, 'replication-factor': self.replication,
+                       'configs': {"min.insync.replicas": 2} },
+            '__consumer_offsets' : { 'partitions': 50, 'replication-factor': self.replication,
                        'configs': {"min.insync.replicas": 2} }
         }
 
@@ -132,21 +134,21 @@ class StreamsBrokerBounceTest(Test):
             signal_node(self, self.kafka.nodes[num], sig)
 
         
-    def setup_system(self):
-         # Setup phase
+    def setup_system(self, start_processor=True):
+        # Setup phase
         self.zk = ZookeeperService(self.test_context, num_nodes=1)
         self.zk.start()
-        
-        self.kafka = KafkaService(self.test_context, num_nodes=self.replication,
-                                  zk=self.zk, topics=self.topics)
+
+        self.kafka = KafkaService(self.test_context, num_nodes=self.replication, zk=self.zk, topics=self.topics)
         self.kafka.start()
         # Start test harness
         self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka)
         self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka)
 
-        
         self.driver.start()
-        self.processor1.start()
+
+        if (start_processor):
+           self.processor1.start()
 
     def collect_results(self, sleep_time_secs):
         data = {}
@@ -210,7 +212,7 @@ class StreamsBrokerBounceTest(Test):
         Streams should throw an exception since it cannot create topics with the desired
         replication factor of 3
         """
-        self.setup_system() 
+        self.setup_system(start_processor=False)
 
         # Sleep to allow test to run for a bit
         time.sleep(sleep_time_secs)
@@ -218,6 +220,8 @@ class StreamsBrokerBounceTest(Test):
         # Fail brokers
         self.fail_broker_type(failure_mode, broker_type)
 
+        self.processor1.start()
+
         return self.collect_results(sleep_time_secs)
 
     @cluster(num_nodes=7)

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.