You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2016/11/17 00:30:57 UTC

kafka git commit: KAFKA-4211; Update system test services to use the new consumer by default

Repository: kafka
Updated Branches:
  refs/heads/trunk 04a13e82a -> 249e41368


KAFKA-4211; Update system test services to use the new consumer by default

Update system test method signatures and method calls to use the new consumer by default.

Author: Vahid Hashemian <va...@us.ibm.com>

Reviewers: Jason Gustafson <ja...@confluent.io>

Closes #2060 from vahidhashemian/KAFKA-4211


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/249e4136
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/249e4136
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/249e4136

Branch: refs/heads/trunk
Commit: 249e4136852be49579eef53b2bce0b2506797666
Parents: 04a13e8
Author: Vahid Hashemian <va...@us.ibm.com>
Authored: Wed Nov 16 16:30:48 2016 -0800
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Wed Nov 16 16:30:48 2016 -0800

----------------------------------------------------------------------
 .../sanity_checks/test_console_consumer.py      |  2 +-
 .../sanity_checks/test_performance_services.py  |  8 +++----
 tests/kafkatest/services/console_consumer.py    |  2 +-
 tests/kafkatest/services/kafka/kafka.py         |  4 ++--
 tests/kafkatest/services/mirror_maker.py        |  2 +-
 .../performance/consumer_performance.py         |  2 +-
 .../tests/client/message_format_change_test.py  |  2 +-
 tests/kafkatest/tests/client/quota_test.py      |  1 -
 tests/kafkatest/tests/connect/connect_test.py   |  2 +-
 .../core/compatibility_test_new_broker_test.py  | 22 ++++++++++----------
 .../tests/core/security_rolling_upgrade_test.py |  2 +-
 tests/kafkatest/tests/core/security_test.py     |  2 +-
 tests/kafkatest/tests/core/throttling_test.py   |  1 -
 tests/kafkatest/tests/core/upgrade_test.py      | 22 ++++++++++----------
 .../core/zookeeper_security_upgrade_test.py     |  2 +-
 15 files changed, 37 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/249e4136/tests/kafkatest/sanity_checks/test_console_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/sanity_checks/test_console_consumer.py b/tests/kafkatest/sanity_checks/test_console_consumer.py
index 773a561..18cbfb7 100644
--- a/tests/kafkatest/sanity_checks/test_console_consumer.py
+++ b/tests/kafkatest/sanity_checks/test_console_consumer.py
@@ -37,7 +37,7 @@ class ConsoleConsumerTest(Test):
         self.zk = ZookeeperService(test_context, num_nodes=1)
         self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk,
                                   topics={self.topic: {"partitions": 1, "replication-factor": 1}})
-        self.consumer = ConsoleConsumer(self.test_context, num_nodes=1, kafka=self.kafka, topic=self.topic)
+        self.consumer = ConsoleConsumer(self.test_context, num_nodes=1, kafka=self.kafka, topic=self.topic, new_consumer=False)
 
     def setUp(self):
         self.zk.start()

http://git-wip-us.apache.org/repos/asf/kafka/blob/249e4136/tests/kafkatest/sanity_checks/test_performance_services.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/sanity_checks/test_performance_services.py b/tests/kafkatest/sanity_checks/test_performance_services.py
index 94a61bc..7b5946a 100644
--- a/tests/kafkatest/sanity_checks/test_performance_services.py
+++ b/tests/kafkatest/sanity_checks/test_performance_services.py
@@ -37,12 +37,12 @@ class PerformanceServiceTest(Test):
 
     # We are keeping 0.8.2 here so that we don't inadvertently break support for it. Since this is just a sanity check,
     # the overhead should be manageable.
-    @parametrize(version=str(LATEST_0_8_2))
+    @parametrize(version=str(LATEST_0_8_2), new_consumer=False)
     @parametrize(version=str(LATEST_0_9), new_consumer=False)
-    @parametrize(version=str(LATEST_0_9), new_consumer=True)
+    @parametrize(version=str(LATEST_0_9))
     @parametrize(version=str(TRUNK), new_consumer=False)
-    @parametrize(version=str(TRUNK), new_consumer=True)
-    def test_version(self, version=str(LATEST_0_9), new_consumer=False):
+    @parametrize(version=str(TRUNK))
+    def test_version(self, version=str(LATEST_0_9), new_consumer=True):
         """
         Sanity check out producer performance service - verify that we can run the service with a small
         number of messages. The actual stats here are pretty meaningless since the number of messages is quite small.

http://git-wip-us.apache.org/repos/asf/kafka/blob/249e4136/tests/kafkatest/services/console_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py
index dae6095..050ea6d 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -87,7 +87,7 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
             "collect_default": True}
     }
 
-    def __init__(self, context, num_nodes, kafka, topic, group_id="test-consumer-group", new_consumer=False,
+    def __init__(self, context, num_nodes, kafka, topic, group_id="test-consumer-group", new_consumer=True,
                  message_validator=None, from_beginning=True, consumer_timeout_ms=None, version=TRUNK,
                  client_id="console-consumer", print_key=False, jmx_object_names=None, jmx_attributes=None,
                  enable_systest_events=False, stop_timeout_sec=15):

http://git-wip-us.apache.org/repos/asf/kafka/blob/249e4136/tests/kafkatest/services/kafka/kafka.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index 4ac53b1..c79f8c8 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -471,7 +471,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         self.logger.info("Leader for topic %s and partition %d is now: %d" % (topic, partition, leader_idx))
         return self.get_node(leader_idx)
 
-    def list_consumer_groups(self, node=None, new_consumer=False, command_config=None):
+    def list_consumer_groups(self, node=None, new_consumer=True, command_config=None):
         """ Get list of consumer groups.
         """
         if node is None:
@@ -498,7 +498,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         self.logger.debug(output)
         return output
 
-    def describe_consumer_group(self, group, node=None, new_consumer=False, command_config=None):
+    def describe_consumer_group(self, group, node=None, new_consumer=True, command_config=None):
         """ Describe a consumer group.
         """
         if node is None:

http://git-wip-us.apache.org/repos/asf/kafka/blob/249e4136/tests/kafkatest/services/mirror_maker.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/mirror_maker.py b/tests/kafkatest/services/mirror_maker.py
index 734452c..14af4cf 100644
--- a/tests/kafkatest/services/mirror_maker.py
+++ b/tests/kafkatest/services/mirror_maker.py
@@ -72,7 +72,7 @@ class MirrorMaker(KafkaPathResolverMixin, Service):
         }
 
     def __init__(self, context, num_nodes, source, target, whitelist=None, blacklist=None, num_streams=1,
-                 new_consumer=False, consumer_timeout_ms=None, offsets_storage="kafka",
+                 new_consumer=True, consumer_timeout_ms=None, offsets_storage="kafka",
                  offset_commit_interval_ms=60000, log_level="DEBUG", producer_interceptor_classes=None):
         """
         MirrorMaker mirrors messages from one or more source clusters to a single destination cluster.

http://git-wip-us.apache.org/repos/asf/kafka/blob/249e4136/tests/kafkatest/services/performance/consumer_performance.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/consumer_performance.py b/tests/kafkatest/services/performance/consumer_performance.py
index a17545d..3e02a5b 100644
--- a/tests/kafkatest/services/performance/consumer_performance.py
+++ b/tests/kafkatest/services/performance/consumer_performance.py
@@ -70,7 +70,7 @@ class ConsumerPerformanceService(PerformanceService):
             "collect_default": True}
     }
 
-    def __init__(self, context, num_nodes, kafka, topic, messages, version=TRUNK, new_consumer=False, settings={}):
+    def __init__(self, context, num_nodes, kafka, topic, messages, version=TRUNK, new_consumer=True, settings={}):
         super(ConsumerPerformanceService, self).__init__(context, num_nodes)
         self.kafka = kafka
         self.security_config = kafka.security_config.client_config()

http://git-wip-us.apache.org/repos/asf/kafka/blob/249e4136/tests/kafkatest/tests/client/message_format_change_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/message_format_change_test.py b/tests/kafkatest/tests/client/message_format_change_test.py
index a1ebf22..a57c04b 100644
--- a/tests/kafkatest/tests/client/message_format_change_test.py
+++ b/tests/kafkatest/tests/client/message_format_change_test.py
@@ -48,7 +48,7 @@ class MessageFormatChangeTest(ProduceConsumeValidateTest):
                                            message_validator=is_int,
                                            version=KafkaVersion(producer_version))
         self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka,
-                                        self.topic, consumer_timeout_ms=30000,
+                                        self.topic, new_consumer=False, consumer_timeout_ms=30000,
                                         message_validator=is_int, version=KafkaVersion(consumer_version))
         self.consumer.group_id = group
         self.run_produce_consume_validate(lambda: wait_until(

http://git-wip-us.apache.org/repos/asf/kafka/blob/249e4136/tests/kafkatest/tests/client/quota_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/quota_test.py b/tests/kafkatest/tests/client/quota_test.py
index 2abd089..1d31569 100644
--- a/tests/kafkatest/tests/client/quota_test.py
+++ b/tests/kafkatest/tests/client/quota_test.py
@@ -141,7 +141,6 @@ class QuotaTest(Test):
 
         # Consume all messages
         consumer = ConsoleConsumer(self.test_context, consumer_num, self.kafka, self.topic,
-            new_consumer=True,
             consumer_timeout_ms=60000, client_id=consumer_client_id,
             jmx_object_names=['kafka.consumer:type=consumer-fetch-manager-metrics,client-id=%s' % consumer_client_id],
             jmx_attributes=['bytes-consumed-rate'])

http://git-wip-us.apache.org/repos/asf/kafka/blob/249e4136/tests/kafkatest/tests/connect/connect_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect/connect_test.py b/tests/kafkatest/tests/connect/connect_test.py
index 93f5734..83acb4a 100644
--- a/tests/kafkatest/tests/connect/connect_test.py
+++ b/tests/kafkatest/tests/connect/connect_test.py
@@ -83,7 +83,7 @@ class ConnectStandaloneFileTest(Test):
         self.source = ConnectStandaloneService(self.test_context, self.kafka, [self.INPUT_FILE, self.OFFSETS_FILE])
         self.sink = ConnectStandaloneService(self.test_context, self.kafka, [self.OUTPUT_FILE, self.OFFSETS_FILE])
         self.consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, self.TOPIC,
-                                                  consumer_timeout_ms=1000, new_consumer=True)
+                                                  consumer_timeout_ms=1000)
 
 
         self.zk.start()

http://git-wip-us.apache.org/repos/asf/kafka/blob/249e4136/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py b/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py
index 7fe875d..d6a0a12 100644
--- a/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py
+++ b/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py
@@ -43,17 +43,17 @@ class ClientCompatibilityTestNewBroker(ProduceConsumeValidateTest):
         self.num_consumers = 1
         self.messages_per_producer = 1000
 
-    @parametrize(producer_version=str(LATEST_0_8_2), consumer_version=str(LATEST_0_8_2), compression_types=["none"], timestamp_type=None)
-    @parametrize(producer_version=str(LATEST_0_8_2), consumer_version=str(LATEST_0_9), compression_types=["none"], timestamp_type=None)
-    @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(TRUNK), compression_types=["none"], timestamp_type=None)
-    @parametrize(producer_version=str(TRUNK), consumer_version=str(LATEST_0_9), compression_types=["none"], timestamp_type=None)
-    @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(TRUNK), compression_types=["snappy"], new_consumer=True, timestamp_type=None)
-    @parametrize(producer_version=str(TRUNK), consumer_version=str(LATEST_0_9), compression_types=["snappy"], new_consumer=True, timestamp_type=str("CreateTime"))
-    @parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK), compression_types=["snappy"], new_consumer=True, timestamp_type=str("LogAppendTime"))
-    @parametrize(producer_version=str(LATEST_0_10_0), consumer_version=str(LATEST_0_10_0), compression_types=["snappy"], new_consumer=True, timestamp_type=str("LogAppendTime"))
-    @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(LATEST_0_9), compression_types=["snappy"], new_consumer=True, timestamp_type=str("LogAppendTime"))
-    @parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK), compression_types=["none"], timestamp_type=str("LogAppendTime"))
-    def test_compatibility(self, producer_version, consumer_version, compression_types, new_consumer=False, timestamp_type=None):
+    @parametrize(producer_version=str(LATEST_0_8_2), consumer_version=str(LATEST_0_8_2), compression_types=["none"], new_consumer=False, timestamp_type=None)
+    @parametrize(producer_version=str(LATEST_0_8_2), consumer_version=str(LATEST_0_9), compression_types=["none"], new_consumer=False, timestamp_type=None)
+    @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(TRUNK), compression_types=["none"], new_consumer=False, timestamp_type=None)
+    @parametrize(producer_version=str(TRUNK), consumer_version=str(LATEST_0_9), compression_types=["none"], new_consumer=False, timestamp_type=None)
+    @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(TRUNK), compression_types=["snappy"], timestamp_type=None)
+    @parametrize(producer_version=str(TRUNK), consumer_version=str(LATEST_0_9), compression_types=["snappy"], timestamp_type=str("CreateTime"))
+    @parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK), compression_types=["snappy"], timestamp_type=str("LogAppendTime"))
+    @parametrize(producer_version=str(LATEST_0_10_0), consumer_version=str(LATEST_0_10_0), compression_types=["snappy"], timestamp_type=str("LogAppendTime"))
+    @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(LATEST_0_9), compression_types=["snappy"], timestamp_type=str("LogAppendTime"))
+    @parametrize(producer_version=str(TRUNK), consumer_version=str(TRUNK), compression_types=["none"], new_consumer=False, timestamp_type=str("LogAppendTime"))
+    def test_compatibility(self, producer_version, consumer_version, compression_types, new_consumer=True, timestamp_type=None):
        
         self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=TRUNK, topics={self.topic: {
                                                                     "partitions": 3,

http://git-wip-us.apache.org/repos/asf/kafka/blob/249e4136/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/security_rolling_upgrade_test.py b/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
index e14d001..51b2e60 100644
--- a/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
+++ b/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
@@ -54,7 +54,7 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
 
         self.consumer = ConsoleConsumer(
             self.test_context, self.num_consumers, self.kafka, self.topic,
-            consumer_timeout_ms=60000, message_validator=is_int, new_consumer=True)
+            consumer_timeout_ms=60000, message_validator=is_int)
 
         self.consumer.group_id = "group"
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/249e4136/tests/kafkatest/tests/core/security_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/security_test.py b/tests/kafkatest/tests/core/security_test.py
index 8c150a2..b6bc656 100644
--- a/tests/kafkatest/tests/core/security_test.py
+++ b/tests/kafkatest/tests/core/security_test.py
@@ -102,5 +102,5 @@ class SecurityTest(ProduceConsumeValidateTest):
 
     def create_producer_and_consumer(self):
         self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput)
-        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, new_consumer=True, consumer_timeout_ms=10000, message_validator=is_int)
+        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, consumer_timeout_ms=10000, message_validator=is_int)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/249e4136/tests/kafkatest/tests/core/throttling_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/throttling_test.py b/tests/kafkatest/tests/core/throttling_test.py
index efc36d4..2e21322 100644
--- a/tests/kafkatest/tests/core/throttling_test.py
+++ b/tests/kafkatest/tests/core/throttling_test.py
@@ -163,7 +163,6 @@ class ThrottlingTest(ProduceConsumeValidateTest):
                                         self.num_consumers,
                                         self.kafka,
                                         self.topic,
-                                        new_consumer=True,
                                         consumer_timeout_ms=60000,
                                         message_validator=is_int,
                                         from_beginning=False)

http://git-wip-us.apache.org/repos/asf/kafka/blob/249e4136/tests/kafkatest/tests/core/upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/upgrade_test.py b/tests/kafkatest/tests/core/upgrade_test.py
index 062f3e5..15a9696 100644
--- a/tests/kafkatest/tests/core/upgrade_test.py
+++ b/tests/kafkatest/tests/core/upgrade_test.py
@@ -60,21 +60,21 @@ class TestUpgrade(ProduceConsumeValidateTest):
                 node.config[config_property.MESSAGE_FORMAT_VERSION] = to_message_format_version
             self.kafka.start_node(node)
 
+    @parametrize(from_kafka_version=str(LATEST_0_10_0), to_message_format_version=None, compression_types=["snappy"], new_consumer=False)
     @parametrize(from_kafka_version=str(LATEST_0_10_0), to_message_format_version=None, compression_types=["snappy"])
-    @parametrize(from_kafka_version=str(LATEST_0_10_0), to_message_format_version=None, compression_types=["snappy"], new_consumer=True)
-    @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["none"])
-    @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["none"], new_consumer=True, security_protocol="SASL_SSL")
-    @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["snappy"], new_consumer=True)
+    @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["none"], new_consumer=False)
+    @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["none"], security_protocol="SASL_SSL")
+    @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["snappy"])
+    @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["lz4"], new_consumer=False)
     @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["lz4"])
-    @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["lz4"], new_consumer=True)
-    @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=str(LATEST_0_9), compression_types=["none"])
-    @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=str(LATEST_0_9), compression_types=["snappy"], new_consumer=True)
+    @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=str(LATEST_0_9), compression_types=["none"], new_consumer=False)
+    @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=str(LATEST_0_9), compression_types=["snappy"])
+    @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=str(LATEST_0_9), compression_types=["lz4"], new_consumer=False)
     @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=str(LATEST_0_9), compression_types=["lz4"])
-    @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=str(LATEST_0_9), compression_types=["lz4"], new_consumer=True)
-    @parametrize(from_kafka_version=str(LATEST_0_8_2), to_message_format_version=None, compression_types=["none"])
-    @parametrize(from_kafka_version=str(LATEST_0_8_2), to_message_format_version=None, compression_types=["snappy"])
+    @parametrize(from_kafka_version=str(LATEST_0_8_2), to_message_format_version=None, compression_types=["none"], new_consumer=False)
+    @parametrize(from_kafka_version=str(LATEST_0_8_2), to_message_format_version=None, compression_types=["snappy"], new_consumer=False)
     def test_upgrade(self, from_kafka_version, to_message_format_version, compression_types,
-                     new_consumer=False, security_protocol="PLAINTEXT"):
+                     new_consumer=True, security_protocol="PLAINTEXT"):
         """Test upgrade of Kafka broker cluster from 0.8.2, 0.9.0 or 0.10.0 to the current version
 
         from_kafka_version is a Kafka version to upgrade from: either 0.8.2.X, 0.9.0.x or 0.10.0.x

http://git-wip-us.apache.org/repos/asf/kafka/blob/249e4136/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py b/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py
index 582eb68..0cfdf16 100644
--- a/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py
+++ b/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py
@@ -52,7 +52,7 @@ class ZooKeeperSecurityUpgradeTest(ProduceConsumeValidateTest):
 
         self.consumer = ConsoleConsumer(
             self.test_context, self.num_consumers, self.kafka, self.topic,
-            consumer_timeout_ms=60000, message_validator=is_int, new_consumer=True)
+            consumer_timeout_ms=60000, message_validator=is_int)
 
         self.consumer.group_id = self.group