You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/04/27 23:38:31 UTC
kafka git commit: KAFKA-5005: IntegrationTestUtils to override
consumer configs and reuse consumer
Repository: kafka
Updated Branches:
refs/heads/trunk efa479736 -> 36aef1c26
KAFKA-5005: IntegrationTestUtils to override consumer configs and reuse consumer
* Producer and Consumer `close` calls were not handled via `try-with-resources`
* `cleanRun` unused field removed
* Refactored handling of Consumer configuration in `IntegrationTestUtils` to ensure auto-committing of offsets and starting from `earliest`
* As a result reverted https://github.com/apache/kafka/pull/2921 since it's redundant now
Author: Armin Braun <me...@obrown.io>
Reviewers: Matthias J. Sax, Guozhang Wang
Closes #2920 from original-brownbear/cleanup-it-utils-closing
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/36aef1c2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/36aef1c2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/36aef1c2
Branch: refs/heads/trunk
Commit: 36aef1c2682169571b410a4986680361db5eec01
Parents: efa4797
Author: Armin Braun <me...@obrown.io>
Authored: Thu Apr 27 16:38:28 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Apr 27 16:38:28 2017 -0700
----------------------------------------------------------------------
.../processor/internals/StreamThread.java | 2 -
.../KTableKTableJoinIntegrationTest.java | 2 -
.../integration/utils/IntegrationTestUtils.java | 231 ++++++++++++-------
3 files changed, 143 insertions(+), 92 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/36aef1c2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 0fd9dd5..10a9307 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -209,7 +209,6 @@ public class StreamThread extends Thread {
final StateDirectory stateDirectory;
private String originalReset;
private StreamPartitionAssignor partitionAssignor;
- private boolean cleanRun = false;
private long timerStartedMs;
private long lastCleanMs;
private long lastCommitMs;
@@ -325,7 +324,6 @@ public class StreamThread extends Thread {
try {
runLoop();
- cleanRun = true;
} catch (final KafkaException e) {
// just re-throw the exception as it should be logged already
throw e;
http://git-wip-us.apache.org/repos/asf/kafka/blob/36aef1c2/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
index 15c955a..26deb92 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
@@ -107,12 +107,10 @@ public class KTableKTableJoinIntegrationTest {
IntegrationTestUtils.produceKeyValuesSynchronously(TABLE_1, table1, producerConfig, MOCK_TIME);
IntegrationTestUtils.produceKeyValuesSynchronously(TABLE_2, table2, producerConfig, MOCK_TIME);
-
CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, "ktable-ktable-consumer");
CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- CONSUMER_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
}
@Before
http://git-wip-us.apache.org/repos/asf/kafka/blob/36aef1c2/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 3c52e19..7cdc180 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -20,6 +20,8 @@ import kafka.api.PartitionStateInfo;
import kafka.api.Request;
import kafka.server.KafkaServer;
import kafka.server.MetadataCache;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -56,62 +58,12 @@ public class IntegrationTestUtils {
public static final String INTERNAL_LEAVE_GROUP_ON_CLOSE = "internal.leave.group.on.close";
/**
- * Returns up to `maxMessages` message-values from the topic.
- *
- * @param topic Kafka topic to read messages from
- * @param consumerConfig Kafka consumer configuration
- * @param waitTime Maximum wait time in milliseconds
- * @param maxMessages Maximum number of messages to read via the consumer.
- * @return The values retrieved via the consumer.
- */
- public static <V> List<V> readValues(final String topic, final Properties consumerConfig, final long waitTime, final int maxMessages) {
- final List<V> returnList = new ArrayList<>();
- final List<KeyValue<Object, V>> kvs = readKeyValues(topic, consumerConfig, waitTime, maxMessages);
- for (final KeyValue<?, V> kv : kvs) {
- returnList.add(kv.value);
- }
- return returnList;
- }
-
- /**
- * Returns up to `maxMessages` by reading via the provided consumer (the topic(s) to read from
- * are already configured in the consumer).
- *
- * @param topic Kafka topic to read messages from
- * @param consumerConfig Kafka consumer configuration
- * @param waitTime Maximum wait time in milliseconds
- * @param maxMessages Maximum number of messages to read via the consumer
- * @return The KeyValue elements retrieved via the consumer
- */
- public static <K, V> List<KeyValue<K, V>> readKeyValues(final String topic, final Properties consumerConfig, final long waitTime, final int maxMessages) {
- final KafkaConsumer<K, V> consumer = new KafkaConsumer<>(consumerConfig);
- consumer.subscribe(Collections.singletonList(topic));
- final int pollIntervalMs = 100;
- final List<KeyValue<K, V>> consumedValues = new ArrayList<>();
- int totalPollTimeMs = 0;
- while (totalPollTimeMs < waitTime && continueConsuming(consumedValues.size(), maxMessages)) {
- totalPollTimeMs += pollIntervalMs;
- final ConsumerRecords<K, V> records = consumer.poll(pollIntervalMs);
- for (final ConsumerRecord<K, V> record : records) {
- consumedValues.add(new KeyValue<>(record.key(), record.value()));
- }
- }
-
- consumer.close();
-
- return consumedValues;
- }
-
- private static boolean continueConsuming(final int messagesConsumed, final int maxMessages) {
- return maxMessages <= 0 || messagesConsumed < maxMessages;
- }
-
- /**
* Removes local state stores. Useful to reset state in-between integration test runs.
*
* @param streamsConfiguration Streams configuration settings
*/
- public static void purgeLocalStreamsState(final Properties streamsConfiguration) throws IOException {
+ public static void purgeLocalStreamsState(final Properties streamsConfiguration) throws
+ IOException {
final String tmpDir = TestUtils.IO_TMP_DIR.getPath();
final String path = streamsConfiguration.getProperty(StreamsConfig.STATE_DIR_CONFIG);
if (path != null) {
@@ -148,14 +100,14 @@ public class IntegrationTestUtils {
final Properties producerConfig,
final Long timestamp)
throws ExecutionException, InterruptedException {
- final Producer<K, V> producer = new KafkaProducer<>(producerConfig);
- for (final KeyValue<K, V> record : records) {
- final Future<RecordMetadata> f = producer.send(
- new ProducerRecord<>(topic, null, timestamp, record.key, record.value));
- f.get();
+ try (Producer<K, V> producer = new KafkaProducer<>(producerConfig)) {
+ for (final KeyValue<K, V> record : records) {
+ final Future<RecordMetadata> f = producer.send(
+ new ProducerRecord<>(topic, null, timestamp, record.key, record.value));
+ f.get();
+ }
+ producer.flush();
}
- producer.flush();
- producer.close();
}
public static <V> void produceValuesSynchronously(
@@ -192,20 +144,21 @@ public class IntegrationTestUtils {
final int expectedNumRecords,
final long waitTime) throws InterruptedException {
final List<KeyValue<K, V>> accumData = new ArrayList<>();
-
- final TestCondition valuesRead = new TestCondition() {
- @Override
- public boolean conditionMet() {
- final List<KeyValue<K, V>> readData = readKeyValues(topic, consumerConfig, waitTime, expectedNumRecords);
- accumData.addAll(readData);
- return accumData.size() >= expectedNumRecords;
- }
- };
-
- final String conditionDetails = "Expecting " + expectedNumRecords + " records from topic " + topic + " while only received " + accumData.size() + ": " + accumData;
-
- TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);
-
+ try (final Consumer<K, V> consumer = createConsumer(consumerConfig)) {
+ final TestCondition valuesRead = new TestCondition() {
+ @Override
+ public boolean conditionMet() {
+ final List<KeyValue<K, V>> readData =
+ readKeyValues(topic, consumer, waitTime, expectedNumRecords);
+ accumData.addAll(readData);
+ return accumData.size() >= expectedNumRecords;
+ }
+ };
+ final String conditionDetails =
+ "Expecting " + expectedNumRecords + " records from topic " + topic +
+ " while only received " + accumData.size() + ": " + accumData;
+ TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);
+ }
return accumData;
}
@@ -232,21 +185,21 @@ public class IntegrationTestUtils {
final int expectedNumRecords,
final long waitTime) throws InterruptedException {
final List<V> accumData = new ArrayList<>();
-
- final TestCondition valuesRead = new TestCondition() {
- @Override
- public boolean conditionMet() {
- final List<V> readData = readValues(topic, consumerConfig, waitTime, expectedNumRecords);
- accumData.addAll(readData);
-
- return accumData.size() >= expectedNumRecords;
- }
- };
-
- final String conditionDetails = "Expecting " + expectedNumRecords + " records from topic " + topic + " while only received " + accumData.size() + ": " + accumData;
-
- TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);
-
+ try (final Consumer<Object, V> consumer = createConsumer(consumerConfig)) {
+ final TestCondition valuesRead = new TestCondition() {
+ @Override
+ public boolean conditionMet() {
+ final List<V> readData =
+ readValues(topic, consumer, waitTime, expectedNumRecords);
+ accumData.addAll(readData);
+ return accumData.size() >= expectedNumRecords;
+ }
+ };
+ final String conditionDetails =
+ "Expecting " + expectedNumRecords + " records from topic " + topic +
+ " while only received " + accumData.size() + ": " + accumData;
+ TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);
+ }
return accumData;
}
@@ -287,4 +240,106 @@ public class IntegrationTestUtils {
}, timeout, "metadata for topic=" + topic + " partition=" + partition + " not propagated to all brokers");
}
+
+ /**
+ * Returns up to `maxMessages` message-values from the topic.
+ *
+ * @param topic Kafka topic to read messages from
+ * @param consumerConfig Kafka consumer config
+ * @param waitTime Maximum wait time in milliseconds
+ * @param maxMessages Maximum number of messages to read via the consumer.
+ * @return The values retrieved via the consumer.
+ */
+ public static <V> List<V> readValues(final String topic, final Properties consumerConfig,
+ final long waitTime, final int maxMessages) {
+ final List<V> returnList;
+ try (final Consumer<Object, V> consumer = createConsumer(consumerConfig)) {
+ returnList = readValues(topic, consumer, waitTime, maxMessages);
+ }
+ return returnList;
+ }
+
+ /**
+ * Returns up to `maxMessages` by reading via the provided consumer (the topic(s) to read from
+ * are already configured in the consumer).
+ *
+ * @param topic Kafka topic to read messages from
+ * @param consumerConfig Kafka consumer config
+ * @param waitTime Maximum wait time in milliseconds
+ * @param maxMessages Maximum number of messages to read via the consumer
+ * @return The KeyValue elements retrieved via the consumer
+ */
+ public static <K, V> List<KeyValue<K, V>> readKeyValues(final String topic,
+ final Properties consumerConfig, final long waitTime, final int maxMessages) {
+ final List<KeyValue<K, V>> consumedValues;
+ try (final Consumer<K, V> consumer = createConsumer(consumerConfig)) {
+ consumedValues = readKeyValues(topic, consumer, waitTime, maxMessages);
+ }
+ return consumedValues;
+ }
+
+ /**
+ * Returns up to `maxMessages` message-values from the topic.
+ *
+ * @param topic Kafka topic to read messages from
+ * @param consumer Kafka consumer
+ * @param waitTime Maximum wait time in milliseconds
+ * @param maxMessages Maximum number of messages to read via the consumer.
+ * @return The values retrieved via the consumer.
+ */
+ private static <V> List<V> readValues(final String topic, final Consumer<Object, V> consumer, final long waitTime, final int maxMessages) {
+ final List<V> returnList = new ArrayList<>();
+ final List<KeyValue<Object, V>> kvs = readKeyValues(topic, consumer, waitTime, maxMessages);
+ for (final KeyValue<?, V> kv : kvs) {
+ returnList.add(kv.value);
+ }
+ return returnList;
+ }
+
+ /**
+ * Returns up to `maxMessages` by reading via the provided consumer (the topic(s) to read from
+ * are already configured in the consumer).
+ *
+ * @param topic Kafka topic to read messages from
+ * @param consumer Kafka consumer
+ * @param waitTime Maximum wait time in milliseconds
+ * @param maxMessages Maximum number of messages to read via the consumer
+ * @return The KeyValue elements retrieved via the consumer
+ */
+ private static <K, V> List<KeyValue<K, V>> readKeyValues(final String topic,
+ final Consumer<K, V> consumer, final long waitTime, final int maxMessages) {
+ final List<KeyValue<K, V>> consumedValues;
+ consumer.subscribe(Collections.singletonList(topic));
+ final int pollIntervalMs = 100;
+ consumedValues = new ArrayList<>();
+ int totalPollTimeMs = 0;
+ while (totalPollTimeMs < waitTime &&
+ continueConsuming(consumedValues.size(), maxMessages)) {
+ totalPollTimeMs += pollIntervalMs;
+ final ConsumerRecords<K, V> records = consumer.poll(pollIntervalMs);
+ for (final ConsumerRecord<K, V> record : records) {
+ consumedValues.add(new KeyValue<>(record.key(), record.value()));
+ }
+ }
+ return consumedValues;
+ }
+
+ private static boolean continueConsuming(final int messagesConsumed, final int maxMessages) {
+ return maxMessages <= 0 || messagesConsumed < maxMessages;
+ }
+
+ /**
+ * Sets up a {@link KafkaConsumer} from a copy of the given configuration that has
+ * {@link ConsumerConfig#AUTO_OFFSET_RESET_CONFIG} set to "earliest" and {@link ConsumerConfig#ENABLE_AUTO_COMMIT_CONFIG}
+ * set to "true" to prevent missing events as well as repeat consumption.
+ * @param consumerConfig Consumer configuration
+ * @return Consumer
+ */
+ private static <K, V> KafkaConsumer<K, V> createConsumer(final Properties consumerConfig) {
+ final Properties filtered = new Properties();
+ filtered.putAll(consumerConfig);
+ filtered.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ filtered.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
+ return new KafkaConsumer<>(filtered);
+ }
}