You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/04/28 05:13:04 UTC

[kafka] branch trunk updated: MINOR: update VerifiableProducer to send keys if configured and removed StreamsRepeatingKeyProducerService (#4841)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c6fd3d4  MINOR: update VerifiableProducer to send keys if configured and removed StreamsRepeatingKeyProducerService (#4841)
c6fd3d4 is described below

commit c6fd3d488e8dba904f6abb2324a89466fc354387
Author: Bill Bejeck <bb...@gmail.com>
AuthorDate: Sat Apr 28 01:12:57 2018 -0400

    MINOR: update VerifiableProducer to send keys if configured and removed StreamsRepeatingKeyProducerService (#4841)
    
    This PR does the following:
    
    * Remove the StreamsRepeatingIntegerKeyProducerService and the associated Java class
    * Add a parameter to VerifiableProducer.java to enable sending keys when specified
    * Update the corresponding Python file verifiable_producer.py to support the new parameter.
    
    Reviewers: Matthias J Sax <ma...@confluentio>, Guozhang Wang <wa...@gmail.com>
---
 .../tests/StreamsRepeatingIntegerKeyProducer.java  | 123 ---------------------
 tests/kafkatest/services/streams.py                |   6 -
 tests/kafkatest/services/verifiable_producer.py    |  11 +-
 tests/kafkatest/tests/streams/base_streams_test.py |   5 +-
 .../tests/streams/streams_standby_replica_test.py  |  19 ++--
 .../org/apache/kafka/tools/VerifiableProducer.java |  54 ++++++---
 6 files changed, 61 insertions(+), 157 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsRepeatingIntegerKeyProducer.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsRepeatingIntegerKeyProducer.java
deleted file mode 100644
index 15a9fa0..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsRepeatingIntegerKeyProducer.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.tests;
-
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.StreamsConfig;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.Properties;
-
-/**
- * Utility class used to send messages with integer keys
- * repeating in sequence every 1000 messages.  Multiple topics for publishing
- * can be provided in the config map with key of 'topics' and ';' delimited list of output topics
- */
-public class StreamsRepeatingIntegerKeyProducer {
-
-    private static volatile boolean keepProducing = true;
-    private volatile static int messageCounter = 0;
-
-    public static void main(final String[] args) throws IOException {
-        if (args.length < 2) {
-            System.err.println("StreamsStandByReplicaTest are expecting two parameters: propFile, additionalConfigs; but only see " + args.length + " parameter");
-            System.exit(1);
-        }
-
-        System.out.println("StreamsTest instance started");
-
-        final String propFileName = args[0];
-        final String configString = args[1];
-
-        final Properties streamsProperties = Utils.loadProps(propFileName);
-        final String kafka = streamsProperties.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
-
-        if (kafka == null) {
-            System.err.println("No bootstrap kafka servers specified in " + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
-            System.exit(1);
-        }
-
-        final Map<String, String> configs = SystemTestUtil.parseConfigs(configString);
-        System.out.println("Using provided configs " + configs);
-
-        final int numMessages = configs.containsKey("num_messages") ? Integer.parseInt(configs.get("num_messages")) : 1000;
-
-        final Properties producerProps = new Properties();
-        producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "StreamsRepeatingIntegerKeyProducer");
-        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
-        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
-
-        final String value = "testingValue";
-        Integer key = 0;
-
-        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
-            @Override
-            public void run() {
-                keepProducing = false;
-            }
-        }));
-
-        final String[] topics = configs.get("topics").split(";");
-        final int totalMessagesToProduce = numMessages * topics.length;
-
-        try (final KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(producerProps)) {
-
-            while (keepProducing && messageCounter < totalMessagesToProduce) {
-                for (final String topic : topics) {
-                    final ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, key.toString(), value + key);
-                    kafkaProducer.send(producerRecord, new Callback() {
-                        @Override
-                        public void onCompletion(final RecordMetadata metadata, final Exception exception) {
-                            if (exception != null) {
-                                exception.printStackTrace(System.err);
-                                System.err.flush();
-                                if (exception instanceof TimeoutException) {
-                                    try {
-                                        // message == org.apache.kafka.common.errors.TimeoutException: Expiring 4 record(s) for data-0: 30004 ms has passed since last attempt plus backoff time
-                                        final int expired = Integer.parseInt(exception.getMessage().split(" ")[2]);
-                                        messageCounter -= expired;
-                                    } catch (final Exception ignore) {
-                                    }
-                                }
-                            }
-                        }
-                    });
-                    messageCounter += 1;
-                }
-                key += 1;
-                if (key % 1000 == 0) {
-                    System.out.println("Sent 1000 messages");
-                    Utils.sleep(100);
-                    key = 0;
-                }
-            }
-        }
-        System.out.println("Producer shut down now, sent total " + messageCounter  + " of requested " + totalMessagesToProduce);
-        System.out.flush();
-    }
-}
diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py
index ec6a081..f268ab8 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -397,12 +397,6 @@ class StreamsStandbyTaskService(StreamsTestBaseService):
                                                         configs)
 
 
-class StreamsRepeatingIntegerKeyProducerService(StreamsTestBaseService):
-    def __init__(self, test_context, kafka, configs):
-        super(StreamsRepeatingIntegerKeyProducerService, self).__init__(test_context,
-                                                                        kafka,
-                                                                        "org.apache.kafka.streams.tests.StreamsRepeatingIntegerKeyProducer",
-                                                                        configs)
 class StreamsUpgradeTestJobRunnerService(StreamsTestBaseService):
     def __init__(self, test_context, kafka):
         super(StreamsUpgradeTestJobRunnerService, self).__init__(test_context,
diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py
index 17f1ec3..cbce27e 100644
--- a/tests/kafkatest/services/verifiable_producer.py
+++ b/tests/kafkatest/services/verifiable_producer.py
@@ -15,16 +15,16 @@
 
 import json
 import os
-import time
 
-from ducktape.services.background_thread import BackgroundThreadService
+import time
 from ducktape.cluster.remoteaccount import RemoteCommandError
-
+from ducktape.services.background_thread import BackgroundThreadService
 from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
 from kafkatest.services.verifiable_client import VerifiableClientMixin
 from kafkatest.utils import is_int, is_int_with_prefix
 from kafkatest.version import DEV_BRANCH
 
+
 class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, BackgroundThreadService):
     """This service wraps org.apache.kafka.tools.VerifiableProducer for use in
     system testing.
@@ -57,7 +57,7 @@ class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
     def __init__(self, context, num_nodes, kafka, topic, max_messages=-1, throughput=100000,
                  message_validator=is_int, compression_types=None, version=DEV_BRANCH, acks=None,
                  stop_timeout_sec=150, request_timeout_sec=30, log_level="INFO",
-                 enable_idempotence=False, offline_nodes=[], create_time=-1):
+                 enable_idempotence=False, offline_nodes=[], create_time=-1, repeating_keys=None):
         """
         :param max_messages is a number of messages to be produced per producer
         :param message_validator checks for an expected format of messages produced. There are
@@ -93,6 +93,7 @@ class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
         self.enable_idempotence = enable_idempotence
         self.offline_nodes = offline_nodes
         self.create_time = create_time
+        self.repeating_keys = repeating_keys
 
     def java_class_name(self):
         return "VerifiableProducer"
@@ -198,6 +199,8 @@ class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
             cmd += " --acks %s " % str(self.acks)
         if self.create_time > -1:
             cmd += " --message-create-time %s " % str(self.create_time)
+        if self.repeating_keys is not None:
+            cmd += " --repeating-keys %s " % str(self.repeating_keys)
 
         cmd += " --producer.config %s" % VerifiableProducer.CONFIG_FILE
         cmd += " 2>> %s | tee -a %s &" % (VerifiableProducer.STDOUT_CAPTURE, VerifiableProducer.STDOUT_CAPTURE)
diff --git a/tests/kafkatest/tests/streams/base_streams_test.py b/tests/kafkatest/tests/streams/base_streams_test.py
index 70da44e..320d4b2 100644
--- a/tests/kafkatest/tests/streams/base_streams_test.py
+++ b/tests/kafkatest/tests/streams/base_streams_test.py
@@ -38,13 +38,14 @@ class BaseStreamsTest(KafkaTest):
                                   client_id,
                                   max_messages=num_messages)
 
-    def get_producer(self, topic, num_messages):
+    def get_producer(self, topic, num_messages, repeating_keys=None):
         return VerifiableProducer(self.test_context,
                                   1,
                                   self.kafka,
                                   topic,
                                   max_messages=num_messages,
-                                  acks=1)
+                                  acks=1,
+                                  repeating_keys=repeating_keys)
 
     def assert_produce_consume(self,
                                streams_source_topic,
diff --git a/tests/kafkatest/tests/streams/streams_standby_replica_test.py b/tests/kafkatest/tests/streams/streams_standby_replica_test.py
index e901cb3..416a110 100644
--- a/tests/kafkatest/tests/streams/streams_standby_replica_test.py
+++ b/tests/kafkatest/tests/streams/streams_standby_replica_test.py
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from kafkatest.services.streams import StreamsRepeatingIntegerKeyProducerService
+from ducktape.utils.util import wait_until
 from kafkatest.services.streams import StreamsStandbyTaskService
 from kafkatest.tests.streams.base_streams_test import BaseStreamsTest
 
@@ -30,7 +30,7 @@ class StreamsStandbyTask(BaseStreamsTest):
     streams_sink_topic_2 = "standbyTaskSink2"
     client_id = "stream-broker-resilience-verify-consumer"
 
-    num_messages = 60000
+    num_messages = 300000
 
     def __init__(self, test_context):
         super(StreamsStandbyTask, self).__init__(test_context,
@@ -42,15 +42,13 @@ class StreamsStandbyTask(BaseStreamsTest):
 
     def test_standby_tasks_rebalance(self):
 
-        driver_configs = "num_messages=%s,topics=%s" % (str(self.num_messages), self.streams_source_topic)
-
-        driver = StreamsRepeatingIntegerKeyProducerService(self.test_context, self.kafka, driver_configs)
-        driver.start()
-
         configs = self.get_configs(",sourceTopic=%s,sinkTopic1=%s,sinkTopic2=%s" % (self.streams_source_topic,
                                                                                     self.streams_sink_topic_1,
                                                                                     self.streams_sink_topic_2))
 
+        producer = self.get_producer(self.streams_source_topic, self.num_messages, repeating_keys=6)
+        producer.start()
+
         processor_1 = StreamsStandbyTaskService(self.test_context, self.kafka, configs)
         processor_2 = StreamsStandbyTaskService(self.test_context, self.kafka, configs)
         processor_3 = StreamsStandbyTaskService(self.test_context, self.kafka, configs)
@@ -113,7 +111,10 @@ class StreamsStandbyTask(BaseStreamsTest):
         self.assert_consume(self.client_id, "assert all messages consumed from %s" % self.streams_sink_topic_1, self.streams_sink_topic_1, self.num_messages)
         self.assert_consume(self.client_id, "assert all messages consumed from %s" % self.streams_sink_topic_2, self.streams_sink_topic_2, self.num_messages)
 
-        self.wait_for_verification(driver, "Producer shut down now, sent total {0} of requested {0}".format(str(self.num_messages)),
-                                   driver.STDOUT_FILE)
+        wait_until(lambda: producer.num_acked >= self.num_messages,
+                   timeout_sec=60,
+                   err_msg="Failed to send all %s messages" % str(self.num_messages))
+
+        producer.stop()
 
 
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
index 975cba7..744142b 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
@@ -18,15 +18,22 @@ package org.apache.kafka.tools;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.ArgumentParserException;
+import net.sourceforge.argparse4j.inf.Namespace;
+
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Exit;
 
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
@@ -36,13 +43,6 @@ import java.util.Properties;
 
 import static net.sourceforge.argparse4j.impl.Arguments.store;
 
-import net.sourceforge.argparse4j.ArgumentParsers;
-import net.sourceforge.argparse4j.inf.ArgumentParser;
-import net.sourceforge.argparse4j.inf.ArgumentParserException;
-import net.sourceforge.argparse4j.inf.Namespace;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.utils.Exit;
-
 /**
  * Primarily intended for use with system testing, this producer prints metadata
  * in the form of JSON to stdout on each "send" request. For example, this helps
@@ -80,13 +80,20 @@ public class VerifiableProducer {
     // if null, then values are produced without a prefix
     private final Integer valuePrefix;
 
+    // Send messages with a key of 0 incrementing by 1 for
+    // each message produced when number specified is reached
+    // key is reset to 0
+    private final Integer repeatingKeys;
+
+    private int keyCounter;
+
     // The create time to set in messages, in milliseconds since epoch
     private Long createTime;
 
     private final Long startTime;
 
     public VerifiableProducer(KafkaProducer<String, String> producer, String topic, int throughput, int maxMessages,
-                              Integer valuePrefix, Long createTime) {
+                              Integer valuePrefix, Long createTime, Integer repeatingKeys) {
 
         this.topic = topic;
         this.throughput = throughput;
@@ -95,6 +102,7 @@ public class VerifiableProducer {
         this.valuePrefix = valuePrefix;
         this.createTime = createTime;
         this.startTime = System.currentTimeMillis();
+        this.repeatingKeys = repeatingKeys;
 
     }
 
@@ -170,6 +178,14 @@ public class VerifiableProducer {
             .dest("valuePrefix")
             .help("If specified, each produced value will have this prefix with a dot separator");
 
+        parser.addArgument("--repeating-keys")
+            .action(store())
+            .required(false)
+            .type(Integer.class)
+            .metavar("REPEATING-KEYS")
+            .dest("repeatingKeys")
+            .help("If specified, each produced record will have a key starting at 0 increment by 1 up to the number specified (exclusive), then the key is set to 0 again");
+
         return parser;
     }
     
@@ -200,6 +216,7 @@ public class VerifiableProducer {
         String configFile = res.getString("producer.config");
         Integer valuePrefix = res.getInt("valuePrefix");
         Long createTime = (long) res.getInt("createTime");
+        Integer repeatingKeys = res.getInt("repeatingKeys");
 
         if (createTime == -1L)
             createTime = null;
@@ -224,7 +241,7 @@ public class VerifiableProducer {
         StringSerializer serializer = new StringSerializer();
         KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps, serializer, serializer);
 
-        return new VerifiableProducer(producer, topic, throughput, maxMessages, valuePrefix, createTime);
+        return new VerifiableProducer(producer, topic, throughput, maxMessages, valuePrefix, createTime, repeatingKeys);
     }
 
     /** Produce a message with given key and value. */
@@ -260,6 +277,17 @@ public class VerifiableProducer {
         return String.format("%d", val);
     }
 
+    public String getKey() {
+        String key = null;
+        if (repeatingKeys != null) {
+            key = Integer.toString(keyCounter++);
+            if (keyCounter == repeatingKeys) {
+                keyCounter = 0;
+            }
+        }
+        return key;
+    }
+
     /** Close the producer to flush any remaining messages. */
     public void close() {
         producer.close();
@@ -468,7 +496,7 @@ public class VerifiableProducer {
             }
             long sendStartMs = System.currentTimeMillis();
 
-            this.send(null, this.getValue(i));
+            this.send(this.getKey(), this.getValue(i));
 
             if (throttler.shouldThrottle(i, sendStartMs)) {
                 throttler.throttle();

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.