You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/03/13 06:03:39 UTC

[kafka] branch trunk updated: KAFKA-6024; Move arg validation in KafkaConsumer ahead of `acquireAndEnsureOpen` (#4617)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0bb8e66  KAFKA-6024; Move arg validation in KafkaConsumer ahead of `acquireAndEnsureOpen`  (#4617)
0bb8e66 is described below

commit 0bb8e66184931e2f7830cb713d9260cc0f3383a9
Author: Siva Santhalingam <si...@gmail.com>
AuthorDate: Mon Mar 12 23:03:32 2018 -0700

    KAFKA-6024; Move arg validation in KafkaConsumer ahead of `acquireAndEnsureOpen`  (#4617)
---
 .../kafka/clients/consumer/KafkaConsumer.java      | 25 +++++++++++-----------
 1 file changed, 12 insertions(+), 13 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 3cd034e..81137f3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -966,13 +966,12 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {
+        if (pattern == null)
+            throw new IllegalArgumentException("Topic pattern to subscribe to cannot be null");
+
         acquireAndEnsureOpen();
         try {
-            if (pattern == null)
-                throw new IllegalArgumentException("Topic pattern to subscribe to cannot be null");
-
             throwIfNoAssignorsConfigured();
-
             log.debug("Subscribed to pattern: {}", pattern);
             this.subscriptions.subscribe(pattern, listener);
             this.metadata.needMetadataForAllTopics(true);
@@ -1337,11 +1336,11 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public void seek(TopicPartition partition, long offset) {
+        if (offset < 0)
+            throw new IllegalArgumentException("seek offset must not be a negative number");
+
         acquireAndEnsureOpen();
         try {
-            if (offset < 0)
-                throw new IllegalArgumentException("seek offset must not be a negative number");
-
             log.debug("Seeking to offset {} for partition {}", offset, partition);
             this.subscriptions.seek(partition, offset);
         } finally {
@@ -1357,11 +1356,11 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * @throws IllegalArgumentException if {@code partitions} is {@code null} or the provided TopicPartition is not assigned to this consumer
      */
     public void seekToBeginning(Collection<TopicPartition> partitions) {
+        if (partitions == null)
+            throw new IllegalArgumentException("Partitions collection cannot be null");
+
         acquireAndEnsureOpen();
         try {
-            if (partitions == null) {
-                throw new IllegalArgumentException("Partitions collection cannot be null");
-            }
             Collection<TopicPartition> parts = partitions.size() == 0 ? this.subscriptions.assignedPartitions() : partitions;
             for (TopicPartition tp : parts) {
                 log.debug("Seeking to beginning of partition {}", tp);
@@ -1383,11 +1382,11 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * @throws IllegalArgumentException if {@code partitions} is {@code null} or the provided TopicPartition is not assigned to this consumer
      */
     public void seekToEnd(Collection<TopicPartition> partitions) {
+        if (partitions == null)
+            throw new IllegalArgumentException("Partitions collection cannot be null");
+
         acquireAndEnsureOpen();
         try {
-            if (partitions == null) {
-                throw new IllegalArgumentException("Partitions collection cannot be null");
-            }
             Collection<TopicPartition> parts = partitions.size() == 0 ? this.subscriptions.assignedPartitions() : partitions;
             for (TopicPartition tp : parts) {
                 log.debug("Seeking to end of partition {}", tp);

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.