You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/04/27 23:38:31 UTC

kafka git commit: KAFKA-5005: IntegrationTestUtils to override consumer configs and reuse consumer

Repository: kafka
Updated Branches:
  refs/heads/trunk efa479736 -> 36aef1c26


KAFKA-5005: IntegrationTestUtils to override consumer configs and reuse consumer

* Producer and Consumer `close` calls were not handled via `try-with-resources`
* `cleanRun` unused field removed
* Refactored handling of Consumer configuration in `IntegrationTestUtils` to ensure auto-committing of offsets and starting from `earliest`
  * As a result reverted https://github.com/apache/kafka/pull/2921 since it's redundant now

Author: Armin Braun <me...@obrown.io>

Reviewers: Matthias J. Sax, Guozhang Wang

Closes #2920 from original-brownbear/cleanup-it-utils-closing


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/36aef1c2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/36aef1c2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/36aef1c2

Branch: refs/heads/trunk
Commit: 36aef1c2682169571b410a4986680361db5eec01
Parents: efa4797
Author: Armin Braun <me...@obrown.io>
Authored: Thu Apr 27 16:38:28 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Apr 27 16:38:28 2017 -0700

----------------------------------------------------------------------
 .../processor/internals/StreamThread.java       |   2 -
 .../KTableKTableJoinIntegrationTest.java        |   2 -
 .../integration/utils/IntegrationTestUtils.java | 231 ++++++++++++-------
 3 files changed, 143 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/36aef1c2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 0fd9dd5..10a9307 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -209,7 +209,6 @@ public class StreamThread extends Thread {
     final StateDirectory stateDirectory;
     private String originalReset;
     private StreamPartitionAssignor partitionAssignor;
-    private boolean cleanRun = false;
     private long timerStartedMs;
     private long lastCleanMs;
     private long lastCommitMs;
@@ -325,7 +324,6 @@ public class StreamThread extends Thread {
 
         try {
             runLoop();
-            cleanRun = true;
         } catch (final KafkaException e) {
             // just re-throw the exception as it should be logged already
             throw e;

http://git-wip-us.apache.org/repos/asf/kafka/blob/36aef1c2/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
index 15c955a..26deb92 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
@@ -107,12 +107,10 @@ public class KTableKTableJoinIntegrationTest {
         IntegrationTestUtils.produceKeyValuesSynchronously(TABLE_1, table1, producerConfig, MOCK_TIME);
         IntegrationTestUtils.produceKeyValuesSynchronously(TABLE_2, table2, producerConfig, MOCK_TIME);
 
-
         CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, "ktable-ktable-consumer");
         CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
         CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        CONSUMER_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
     }
 
     @Before

http://git-wip-us.apache.org/repos/asf/kafka/blob/36aef1c2/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 3c52e19..7cdc180 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -20,6 +20,8 @@ import kafka.api.PartitionStateInfo;
 import kafka.api.Request;
 import kafka.server.KafkaServer;
 import kafka.server.MetadataCache;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -56,62 +58,12 @@ public class IntegrationTestUtils {
     public static final String INTERNAL_LEAVE_GROUP_ON_CLOSE = "internal.leave.group.on.close";
 
     /**
-     * Returns up to `maxMessages` message-values from the topic.
-     *
-     * @param topic          Kafka topic to read messages from
-     * @param consumerConfig Kafka consumer configuration
-     * @param waitTime       Maximum wait time in milliseconds
-     * @param maxMessages    Maximum number of messages to read via the consumer.
-     * @return The values retrieved via the consumer.
-     */
-    public static <V> List<V> readValues(final String topic, final Properties consumerConfig, final long waitTime, final int maxMessages) {
-        final List<V> returnList = new ArrayList<>();
-        final List<KeyValue<Object, V>> kvs = readKeyValues(topic, consumerConfig, waitTime, maxMessages);
-        for (final KeyValue<?, V> kv : kvs) {
-            returnList.add(kv.value);
-        }
-        return returnList;
-    }
-
-    /**
-     * Returns up to `maxMessages` by reading via the provided consumer (the topic(s) to read from
-     * are already configured in the consumer).
-     *
-     * @param topic          Kafka topic to read messages from
-     * @param consumerConfig Kafka consumer configuration
-     * @param waitTime       Maximum wait time in milliseconds
-     * @param maxMessages    Maximum number of messages to read via the consumer
-     * @return The KeyValue elements retrieved via the consumer
-     */
-    public static <K, V> List<KeyValue<K, V>> readKeyValues(final String topic, final Properties consumerConfig, final long waitTime, final int maxMessages) {
-        final KafkaConsumer<K, V> consumer = new KafkaConsumer<>(consumerConfig);
-        consumer.subscribe(Collections.singletonList(topic));
-        final int pollIntervalMs = 100;
-        final List<KeyValue<K, V>> consumedValues = new ArrayList<>();
-        int totalPollTimeMs = 0;
-        while (totalPollTimeMs < waitTime && continueConsuming(consumedValues.size(), maxMessages)) {
-            totalPollTimeMs += pollIntervalMs;
-            final ConsumerRecords<K, V> records = consumer.poll(pollIntervalMs);
-            for (final ConsumerRecord<K, V> record : records) {
-                consumedValues.add(new KeyValue<>(record.key(), record.value()));
-            }
-        }
-
-        consumer.close();
-
-        return consumedValues;
-    }
-
-    private static boolean continueConsuming(final int messagesConsumed, final int maxMessages) {
-        return maxMessages <= 0 || messagesConsumed < maxMessages;
-    }
-
-    /**
      * Removes local state stores.  Useful to reset state in-between integration test runs.
      *
      * @param streamsConfiguration Streams configuration settings
      */
-    public static void purgeLocalStreamsState(final Properties streamsConfiguration) throws IOException {
+    public static void purgeLocalStreamsState(final Properties streamsConfiguration) throws
+        IOException {
         final String tmpDir = TestUtils.IO_TMP_DIR.getPath();
         final String path = streamsConfiguration.getProperty(StreamsConfig.STATE_DIR_CONFIG);
         if (path != null) {
@@ -148,14 +100,14 @@ public class IntegrationTestUtils {
                                                                          final Properties producerConfig,
                                                                          final Long timestamp)
         throws ExecutionException, InterruptedException {
-        final Producer<K, V> producer = new KafkaProducer<>(producerConfig);
-        for (final KeyValue<K, V> record : records) {
-            final Future<RecordMetadata> f = producer.send(
-                new ProducerRecord<>(topic, null, timestamp, record.key, record.value));
-            f.get();
+        try (Producer<K, V> producer = new KafkaProducer<>(producerConfig)) {
+            for (final KeyValue<K, V> record : records) {
+                final Future<RecordMetadata> f = producer.send(
+                    new ProducerRecord<>(topic, null, timestamp, record.key, record.value));
+                f.get();
+            }
+            producer.flush();
         }
-        producer.flush();
-        producer.close();
     }
 
     public static <V> void produceValuesSynchronously(
@@ -192,20 +144,21 @@ public class IntegrationTestUtils {
                                                                                   final int expectedNumRecords,
                                                                                   final long waitTime) throws InterruptedException {
         final List<KeyValue<K, V>> accumData = new ArrayList<>();
-
-        final TestCondition valuesRead = new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                final List<KeyValue<K, V>> readData = readKeyValues(topic, consumerConfig, waitTime, expectedNumRecords);
-                accumData.addAll(readData);
-                return accumData.size() >= expectedNumRecords;
-            }
-        };
-
-        final String conditionDetails = "Expecting " + expectedNumRecords + " records from topic " + topic + " while only received " + accumData.size() + ": " + accumData;
-
-        TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);
-
+        try (final Consumer<K, V> consumer = createConsumer(consumerConfig)) {
+            final TestCondition valuesRead = new TestCondition() {
+                @Override
+                public boolean conditionMet() {
+                    final List<KeyValue<K, V>> readData =
+                        readKeyValues(topic, consumer, waitTime, expectedNumRecords);
+                    accumData.addAll(readData);
+                    return accumData.size() >= expectedNumRecords;
+                }
+            };
+            final String conditionDetails =
+                "Expecting " + expectedNumRecords + " records from topic " + topic +
+                    " while only received " + accumData.size() + ": " + accumData;
+            TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);
+        }
         return accumData;
     }
 
@@ -232,21 +185,21 @@ public class IntegrationTestUtils {
                                                                 final int expectedNumRecords,
                                                                 final long waitTime) throws InterruptedException {
         final List<V> accumData = new ArrayList<>();
-
-        final TestCondition valuesRead = new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                final List<V> readData = readValues(topic, consumerConfig, waitTime, expectedNumRecords);
-                accumData.addAll(readData);
-
-                return accumData.size() >= expectedNumRecords;
-            }
-        };
-
-        final String conditionDetails = "Expecting " + expectedNumRecords + " records from topic " + topic + " while only received " + accumData.size() + ": " + accumData;
-
-        TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);
-
+        try (final Consumer<Object, V> consumer = createConsumer(consumerConfig)) {
+            final TestCondition valuesRead = new TestCondition() {
+                @Override
+                public boolean conditionMet() {
+                    final List<V> readData =
+                        readValues(topic, consumer, waitTime, expectedNumRecords);
+                    accumData.addAll(readData);
+                    return accumData.size() >= expectedNumRecords;
+                }
+            };
+            final String conditionDetails =
+                "Expecting " + expectedNumRecords + " records from topic " + topic +
+                    " while only received " + accumData.size() + ": " + accumData;
+            TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);
+        }
         return accumData;
     }
 
@@ -287,4 +240,106 @@ public class IntegrationTestUtils {
         }, timeout, "metadata for topic=" + topic + " partition=" + partition + " not propagated to all brokers");
 
     }
+
+    /**
+     * Returns up to `maxMessages` message-values from the topic.
+     *
+     * @param topic          Kafka topic to read messages from
+     * @param consumerConfig Kafka consumer config
+     * @param waitTime       Maximum wait time in milliseconds
+     * @param maxMessages    Maximum number of messages to read via the consumer.
+     * @return The values retrieved via the consumer.
+     */
+    public static <V> List<V> readValues(final String topic, final Properties consumerConfig,
+        final long waitTime, final int maxMessages) {
+        final List<V> returnList;
+        try (final Consumer<Object, V> consumer = createConsumer(consumerConfig)) {
+            returnList = readValues(topic, consumer, waitTime, maxMessages);
+        }
+        return returnList;
+    }
+
+    /**
+     * Returns up to `maxMessages` by reading via the provided consumer (the topic(s) to read from
+     * are already configured in the consumer).
+     *
+     * @param topic          Kafka topic to read messages from
+     * @param consumerConfig Kafka consumer config
+     * @param waitTime       Maximum wait time in milliseconds
+     * @param maxMessages    Maximum number of messages to read via the consumer
+     * @return The KeyValue elements retrieved via the consumer
+     */
+    public static <K, V> List<KeyValue<K, V>> readKeyValues(final String topic,
+        final Properties consumerConfig, final long waitTime, final int maxMessages) {
+        final List<KeyValue<K, V>> consumedValues;
+        try (final Consumer<K, V> consumer = createConsumer(consumerConfig)) {
+            consumedValues = readKeyValues(topic, consumer, waitTime, maxMessages);
+        }
+        return consumedValues;
+    }
+
+    /**
+     * Returns up to `maxMessages` message-values from the topic.
+     *
+     * @param topic          Kafka topic to read messages from
+     * @param consumer       Kafka consumer
+     * @param waitTime       Maximum wait time in milliseconds
+     * @param maxMessages    Maximum number of messages to read via the consumer.
+     * @return The values retrieved via the consumer.
+     */
+    private static <V> List<V> readValues(final String topic, final Consumer<Object, V> consumer, final long waitTime, final int maxMessages) {
+        final List<V> returnList = new ArrayList<>();
+        final List<KeyValue<Object, V>> kvs = readKeyValues(topic, consumer, waitTime, maxMessages);
+        for (final KeyValue<?, V> kv : kvs) {
+            returnList.add(kv.value);
+        }
+        return returnList;
+    }
+
+    /**
+     * Returns up to `maxMessages` by reading via the provided consumer (the topic(s) to read from
+     * are already configured in the consumer).
+     *
+     * @param topic          Kafka topic to read messages from
+     * @param consumer       Kafka consumer
+     * @param waitTime       Maximum wait time in milliseconds
+     * @param maxMessages    Maximum number of messages to read via the consumer
+     * @return The KeyValue elements retrieved via the consumer
+     */
+    private static <K, V> List<KeyValue<K, V>> readKeyValues(final String topic,
+        final Consumer<K, V> consumer, final long waitTime, final int maxMessages) {
+        final List<KeyValue<K, V>> consumedValues;
+        consumer.subscribe(Collections.singletonList(topic));
+        final int pollIntervalMs = 100;
+        consumedValues = new ArrayList<>();
+        int totalPollTimeMs = 0;
+        while (totalPollTimeMs < waitTime &&
+            continueConsuming(consumedValues.size(), maxMessages)) {
+            totalPollTimeMs += pollIntervalMs;
+            final ConsumerRecords<K, V> records = consumer.poll(pollIntervalMs);
+            for (final ConsumerRecord<K, V> record : records) {
+                consumedValues.add(new KeyValue<>(record.key(), record.value()));
+            }
+        }
+        return consumedValues;
+    }
+
+    private static boolean continueConsuming(final int messagesConsumed, final int maxMessages) {
+        return maxMessages <= 0 || messagesConsumed < maxMessages;
+    }
+
+    /**
+     * Sets up a {@link KafkaConsumer} from a copy of the given configuration that has
+     * {@link ConsumerConfig#AUTO_OFFSET_RESET_CONFIG} set to "earliest" and {@link ConsumerConfig#ENABLE_AUTO_COMMIT_CONFIG}
+     * set to "true" to prevent missing events as well as repeat consumption.
+     * @param consumerConfig Consumer configuration
+     * @return Consumer
+     */
+    private static <K, V> KafkaConsumer<K, V> createConsumer(final Properties consumerConfig) {
+        final Properties filtered = new Properties();
+        filtered.putAll(consumerConfig);
+        filtered.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        filtered.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
+        return new KafkaConsumer<>(filtered);
+    }
 }