You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/03/13 06:03:39 UTC
[kafka] branch trunk updated: KAFKA-6024;
Move arg validation in KafkaConsumer ahead of
`acquireAndEnsureOpen` (#4617)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0bb8e66 KAFKA-6024; Move arg validation in KafkaConsumer ahead of `acquireAndEnsureOpen` (#4617)
0bb8e66 is described below
commit 0bb8e66184931e2f7830cb713d9260cc0f3383a9
Author: Siva Santhalingam <si...@gmail.com>
AuthorDate: Mon Mar 12 23:03:32 2018 -0700
KAFKA-6024; Move arg validation in KafkaConsumer ahead of `acquireAndEnsureOpen` (#4617)
---
.../kafka/clients/consumer/KafkaConsumer.java | 25 +++++++++++-----------
1 file changed, 12 insertions(+), 13 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 3cd034e..81137f3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -966,13 +966,12 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {
+ if (pattern == null)
+ throw new IllegalArgumentException("Topic pattern to subscribe to cannot be null");
+
acquireAndEnsureOpen();
try {
- if (pattern == null)
- throw new IllegalArgumentException("Topic pattern to subscribe to cannot be null");
-
throwIfNoAssignorsConfigured();
-
log.debug("Subscribed to pattern: {}", pattern);
this.subscriptions.subscribe(pattern, listener);
this.metadata.needMetadataForAllTopics(true);
@@ -1337,11 +1336,11 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public void seek(TopicPartition partition, long offset) {
+ if (offset < 0)
+ throw new IllegalArgumentException("seek offset must not be a negative number");
+
acquireAndEnsureOpen();
try {
- if (offset < 0)
- throw new IllegalArgumentException("seek offset must not be a negative number");
-
log.debug("Seeking to offset {} for partition {}", offset, partition);
this.subscriptions.seek(partition, offset);
} finally {
@@ -1357,11 +1356,11 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* @throws IllegalArgumentException if {@code partitions} is {@code null} or the provided TopicPartition is not assigned to this consumer
*/
public void seekToBeginning(Collection<TopicPartition> partitions) {
+ if (partitions == null)
+ throw new IllegalArgumentException("Partitions collection cannot be null");
+
acquireAndEnsureOpen();
try {
- if (partitions == null) {
- throw new IllegalArgumentException("Partitions collection cannot be null");
- }
Collection<TopicPartition> parts = partitions.size() == 0 ? this.subscriptions.assignedPartitions() : partitions;
for (TopicPartition tp : parts) {
log.debug("Seeking to beginning of partition {}", tp);
@@ -1383,11 +1382,11 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* @throws IllegalArgumentException if {@code partitions} is {@code null} or the provided TopicPartition is not assigned to this consumer
*/
public void seekToEnd(Collection<TopicPartition> partitions) {
+ if (partitions == null)
+ throw new IllegalArgumentException("Partitions collection cannot be null");
+
acquireAndEnsureOpen();
try {
- if (partitions == null) {
- throw new IllegalArgumentException("Partitions collection cannot be null");
- }
Collection<TopicPartition> parts = partitions.size() == 0 ? this.subscriptions.assignedPartitions() : partitions;
for (TopicPartition tp : parts) {
log.debug("Seeking to end of partition {}", tp);
--
To stop receiving notification emails like this one, please contact
jgus@apache.org.