You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/04/28 05:13:04 UTC
[kafka] branch trunk updated: MINOR: update VerifiableProducer to
send keys if configured and removed StreamsRepeatingKeyProducerService
(#4841)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c6fd3d4 MINOR: update VerifiableProducer to send keys if configured and removed StreamsRepeatingKeyProducerService (#4841)
c6fd3d4 is described below
commit c6fd3d488e8dba904f6abb2324a89466fc354387
Author: Bill Bejeck <bb...@gmail.com>
AuthorDate: Sat Apr 28 01:12:57 2018 -0400
MINOR: update VerifiableProducer to send keys if configured and removed StreamsRepeatingKeyProducerService (#4841)
This PR does the following:
* Remove the StreamsRepeatingIntegerKeyProducerService and the associated Java class
* Add a parameter to VerifiableProducer.java to enable sending keys when specified
* Update the corresponding Python file verifiable_producer.py to support the new parameter.
Reviewers: Matthias J Sax <ma...@confluentio>, Guozhang Wang <wa...@gmail.com>
---
.../tests/StreamsRepeatingIntegerKeyProducer.java | 123 ---------------------
tests/kafkatest/services/streams.py | 6 -
tests/kafkatest/services/verifiable_producer.py | 11 +-
tests/kafkatest/tests/streams/base_streams_test.py | 5 +-
.../tests/streams/streams_standby_replica_test.py | 19 ++--
.../org/apache/kafka/tools/VerifiableProducer.java | 54 ++++++---
6 files changed, 61 insertions(+), 157 deletions(-)
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsRepeatingIntegerKeyProducer.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsRepeatingIntegerKeyProducer.java
deleted file mode 100644
index 15a9fa0..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsRepeatingIntegerKeyProducer.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.tests;
-
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.StreamsConfig;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.Properties;
-
-/**
- * Utility class used to send messages with integer keys
- * repeating in sequence every 1000 messages. Multiple topics for publishing
- * can be provided in the config map with key of 'topics' and ';' delimited list of output topics
- */
-public class StreamsRepeatingIntegerKeyProducer {
-
- private static volatile boolean keepProducing = true;
- private volatile static int messageCounter = 0;
-
- public static void main(final String[] args) throws IOException {
- if (args.length < 2) {
- System.err.println("StreamsStandByReplicaTest are expecting two parameters: propFile, additionalConfigs; but only see " + args.length + " parameter");
- System.exit(1);
- }
-
- System.out.println("StreamsTest instance started");
-
- final String propFileName = args[0];
- final String configString = args[1];
-
- final Properties streamsProperties = Utils.loadProps(propFileName);
- final String kafka = streamsProperties.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
-
- if (kafka == null) {
- System.err.println("No bootstrap kafka servers specified in " + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
- System.exit(1);
- }
-
- final Map<String, String> configs = SystemTestUtil.parseConfigs(configString);
- System.out.println("Using provided configs " + configs);
-
- final int numMessages = configs.containsKey("num_messages") ? Integer.parseInt(configs.get("num_messages")) : 1000;
-
- final Properties producerProps = new Properties();
- producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "StreamsRepeatingIntegerKeyProducer");
- producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
- producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
-
- final String value = "testingValue";
- Integer key = 0;
-
- Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
- @Override
- public void run() {
- keepProducing = false;
- }
- }));
-
- final String[] topics = configs.get("topics").split(";");
- final int totalMessagesToProduce = numMessages * topics.length;
-
- try (final KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(producerProps)) {
-
- while (keepProducing && messageCounter < totalMessagesToProduce) {
- for (final String topic : topics) {
- final ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, key.toString(), value + key);
- kafkaProducer.send(producerRecord, new Callback() {
- @Override
- public void onCompletion(final RecordMetadata metadata, final Exception exception) {
- if (exception != null) {
- exception.printStackTrace(System.err);
- System.err.flush();
- if (exception instanceof TimeoutException) {
- try {
- // message == org.apache.kafka.common.errors.TimeoutException: Expiring 4 record(s) for data-0: 30004 ms has passed since last attempt plus backoff time
- final int expired = Integer.parseInt(exception.getMessage().split(" ")[2]);
- messageCounter -= expired;
- } catch (final Exception ignore) {
- }
- }
- }
- }
- });
- messageCounter += 1;
- }
- key += 1;
- if (key % 1000 == 0) {
- System.out.println("Sent 1000 messages");
- Utils.sleep(100);
- key = 0;
- }
- }
- }
- System.out.println("Producer shut down now, sent total " + messageCounter + " of requested " + totalMessagesToProduce);
- System.out.flush();
- }
-}
diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py
index ec6a081..f268ab8 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -397,12 +397,6 @@ class StreamsStandbyTaskService(StreamsTestBaseService):
configs)
-class StreamsRepeatingIntegerKeyProducerService(StreamsTestBaseService):
- def __init__(self, test_context, kafka, configs):
- super(StreamsRepeatingIntegerKeyProducerService, self).__init__(test_context,
- kafka,
- "org.apache.kafka.streams.tests.StreamsRepeatingIntegerKeyProducer",
- configs)
class StreamsUpgradeTestJobRunnerService(StreamsTestBaseService):
def __init__(self, test_context, kafka):
super(StreamsUpgradeTestJobRunnerService, self).__init__(test_context,
diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py
index 17f1ec3..cbce27e 100644
--- a/tests/kafkatest/services/verifiable_producer.py
+++ b/tests/kafkatest/services/verifiable_producer.py
@@ -15,16 +15,16 @@
import json
import os
-import time
-from ducktape.services.background_thread import BackgroundThreadService
+import time
from ducktape.cluster.remoteaccount import RemoteCommandError
-
+from ducktape.services.background_thread import BackgroundThreadService
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
from kafkatest.services.verifiable_client import VerifiableClientMixin
from kafkatest.utils import is_int, is_int_with_prefix
from kafkatest.version import DEV_BRANCH
+
class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, BackgroundThreadService):
"""This service wraps org.apache.kafka.tools.VerifiableProducer for use in
system testing.
@@ -57,7 +57,7 @@ class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
def __init__(self, context, num_nodes, kafka, topic, max_messages=-1, throughput=100000,
message_validator=is_int, compression_types=None, version=DEV_BRANCH, acks=None,
stop_timeout_sec=150, request_timeout_sec=30, log_level="INFO",
- enable_idempotence=False, offline_nodes=[], create_time=-1):
+ enable_idempotence=False, offline_nodes=[], create_time=-1, repeating_keys=None):
"""
:param max_messages is a number of messages to be produced per producer
:param message_validator checks for an expected format of messages produced. There are
@@ -93,6 +93,7 @@ class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
self.enable_idempotence = enable_idempotence
self.offline_nodes = offline_nodes
self.create_time = create_time
+ self.repeating_keys = repeating_keys
def java_class_name(self):
return "VerifiableProducer"
@@ -198,6 +199,8 @@ class VerifiableProducer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
cmd += " --acks %s " % str(self.acks)
if self.create_time > -1:
cmd += " --message-create-time %s " % str(self.create_time)
+ if self.repeating_keys is not None:
+ cmd += " --repeating-keys %s " % str(self.repeating_keys)
cmd += " --producer.config %s" % VerifiableProducer.CONFIG_FILE
cmd += " 2>> %s | tee -a %s &" % (VerifiableProducer.STDOUT_CAPTURE, VerifiableProducer.STDOUT_CAPTURE)
diff --git a/tests/kafkatest/tests/streams/base_streams_test.py b/tests/kafkatest/tests/streams/base_streams_test.py
index 70da44e..320d4b2 100644
--- a/tests/kafkatest/tests/streams/base_streams_test.py
+++ b/tests/kafkatest/tests/streams/base_streams_test.py
@@ -38,13 +38,14 @@ class BaseStreamsTest(KafkaTest):
client_id,
max_messages=num_messages)
- def get_producer(self, topic, num_messages):
+ def get_producer(self, topic, num_messages, repeating_keys=None):
return VerifiableProducer(self.test_context,
1,
self.kafka,
topic,
max_messages=num_messages,
- acks=1)
+ acks=1,
+ repeating_keys=repeating_keys)
def assert_produce_consume(self,
streams_source_topic,
diff --git a/tests/kafkatest/tests/streams/streams_standby_replica_test.py b/tests/kafkatest/tests/streams/streams_standby_replica_test.py
index e901cb3..416a110 100644
--- a/tests/kafkatest/tests/streams/streams_standby_replica_test.py
+++ b/tests/kafkatest/tests/streams/streams_standby_replica_test.py
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from kafkatest.services.streams import StreamsRepeatingIntegerKeyProducerService
+from ducktape.utils.util import wait_until
from kafkatest.services.streams import StreamsStandbyTaskService
from kafkatest.tests.streams.base_streams_test import BaseStreamsTest
@@ -30,7 +30,7 @@ class StreamsStandbyTask(BaseStreamsTest):
streams_sink_topic_2 = "standbyTaskSink2"
client_id = "stream-broker-resilience-verify-consumer"
- num_messages = 60000
+ num_messages = 300000
def __init__(self, test_context):
super(StreamsStandbyTask, self).__init__(test_context,
@@ -42,15 +42,13 @@ class StreamsStandbyTask(BaseStreamsTest):
def test_standby_tasks_rebalance(self):
- driver_configs = "num_messages=%s,topics=%s" % (str(self.num_messages), self.streams_source_topic)
-
- driver = StreamsRepeatingIntegerKeyProducerService(self.test_context, self.kafka, driver_configs)
- driver.start()
-
configs = self.get_configs(",sourceTopic=%s,sinkTopic1=%s,sinkTopic2=%s" % (self.streams_source_topic,
self.streams_sink_topic_1,
self.streams_sink_topic_2))
+ producer = self.get_producer(self.streams_source_topic, self.num_messages, repeating_keys=6)
+ producer.start()
+
processor_1 = StreamsStandbyTaskService(self.test_context, self.kafka, configs)
processor_2 = StreamsStandbyTaskService(self.test_context, self.kafka, configs)
processor_3 = StreamsStandbyTaskService(self.test_context, self.kafka, configs)
@@ -113,7 +111,10 @@ class StreamsStandbyTask(BaseStreamsTest):
self.assert_consume(self.client_id, "assert all messages consumed from %s" % self.streams_sink_topic_1, self.streams_sink_topic_1, self.num_messages)
self.assert_consume(self.client_id, "assert all messages consumed from %s" % self.streams_sink_topic_2, self.streams_sink_topic_2, self.num_messages)
- self.wait_for_verification(driver, "Producer shut down now, sent total {0} of requested {0}".format(str(self.num_messages)),
- driver.STDOUT_FILE)
+ wait_until(lambda: producer.num_acked >= self.num_messages,
+ timeout_sec=60,
+ err_msg="Failed to send all %s messages" % str(self.num_messages))
+
+ producer.stop()
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
index 975cba7..744142b 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
@@ -18,15 +18,22 @@ package org.apache.kafka.tools;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.ArgumentParserException;
+import net.sourceforge.argparse4j.inf.Namespace;
+
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Exit;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
@@ -36,13 +43,6 @@ import java.util.Properties;
import static net.sourceforge.argparse4j.impl.Arguments.store;
-import net.sourceforge.argparse4j.ArgumentParsers;
-import net.sourceforge.argparse4j.inf.ArgumentParser;
-import net.sourceforge.argparse4j.inf.ArgumentParserException;
-import net.sourceforge.argparse4j.inf.Namespace;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.utils.Exit;
-
/**
* Primarily intended for use with system testing, this producer prints metadata
* in the form of JSON to stdout on each "send" request. For example, this helps
@@ -80,13 +80,20 @@ public class VerifiableProducer {
// if null, then values are produced without a prefix
private final Integer valuePrefix;
+ // Send messages with a key of 0 incrementing by 1 for
+ // each message produced when number specified is reached
+ // key is reset to 0
+ private final Integer repeatingKeys;
+
+ private int keyCounter;
+
// The create time to set in messages, in milliseconds since epoch
private Long createTime;
private final Long startTime;
public VerifiableProducer(KafkaProducer<String, String> producer, String topic, int throughput, int maxMessages,
- Integer valuePrefix, Long createTime) {
+ Integer valuePrefix, Long createTime, Integer repeatingKeys) {
this.topic = topic;
this.throughput = throughput;
@@ -95,6 +102,7 @@ public class VerifiableProducer {
this.valuePrefix = valuePrefix;
this.createTime = createTime;
this.startTime = System.currentTimeMillis();
+ this.repeatingKeys = repeatingKeys;
}
@@ -170,6 +178,14 @@ public class VerifiableProducer {
.dest("valuePrefix")
.help("If specified, each produced value will have this prefix with a dot separator");
+ parser.addArgument("--repeating-keys")
+ .action(store())
+ .required(false)
+ .type(Integer.class)
+ .metavar("REPEATING-KEYS")
+ .dest("repeatingKeys")
+ .help("If specified, each produced record will have a key starting at 0 increment by 1 up to the number specified (exclusive), then the key is set to 0 again");
+
return parser;
}
@@ -200,6 +216,7 @@ public class VerifiableProducer {
String configFile = res.getString("producer.config");
Integer valuePrefix = res.getInt("valuePrefix");
Long createTime = (long) res.getInt("createTime");
+ Integer repeatingKeys = res.getInt("repeatingKeys");
if (createTime == -1L)
createTime = null;
@@ -224,7 +241,7 @@ public class VerifiableProducer {
StringSerializer serializer = new StringSerializer();
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps, serializer, serializer);
- return new VerifiableProducer(producer, topic, throughput, maxMessages, valuePrefix, createTime);
+ return new VerifiableProducer(producer, topic, throughput, maxMessages, valuePrefix, createTime, repeatingKeys);
}
/** Produce a message with given key and value. */
@@ -260,6 +277,17 @@ public class VerifiableProducer {
return String.format("%d", val);
}
+ public String getKey() {
+ String key = null;
+ if (repeatingKeys != null) {
+ key = Integer.toString(keyCounter++);
+ if (keyCounter == repeatingKeys) {
+ keyCounter = 0;
+ }
+ }
+ return key;
+ }
+
/** Close the producer to flush any remaining messages. */
public void close() {
producer.close();
@@ -468,7 +496,7 @@ public class VerifiableProducer {
}
long sendStartMs = System.currentTimeMillis();
- this.send(null, this.getValue(i));
+ this.send(this.getKey(), this.getValue(i));
if (throttler.shouldThrottle(i, sendStartMs)) {
throttler.throttle();
--
To stop receiving notification emails like this one, please contact
guozhang@apache.org.