You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/12/10 19:49:06 UTC

kafka git commit: KAFKA-2893: Add a simple non-negative partition seek check

Repository: kafka
Updated Branches:
  refs/heads/trunk ed8748b7d -> 9d23b512c


KAFKA-2893: Add a simple non-negative partition seek check

Author: jinxing <ji...@fenbi.com>
Author: ZoneMayor <ji...@126.com>

Reviewers: Guozhang Wang

Closes #628 from ZoneMayor/trunk-KAFKA-2893


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9d23b512
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9d23b512
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9d23b512

Branch: refs/heads/trunk
Commit: 9d23b512ccb95940782f51c93bd6cdbed3e938db
Parents: ed8748b
Author: Jin Xing <ji...@fenbi.com>
Authored: Thu Dec 10 10:49:02 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Dec 10 10:49:02 2015 -0800

----------------------------------------------------------------------
 .../apache/kafka/clients/consumer/KafkaConsumer.java  |  3 +++
 .../kafka/clients/consumer/KafkaConsumerTest.java     | 14 ++++++++++++++
 2 files changed, 17 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9d23b512/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 912b307..772e621 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1005,6 +1005,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public void seek(TopicPartition partition, long offset) {
+        if (offset < 0) {
+            throw new IllegalArgumentException("seek offset must not be a negative number");
+        }
         acquire();
         try {
             log.debug("Seeking to offset {} for partition {}", offset, partition);

http://git-wip-us.apache.org/repos/asf/kafka/blob/9d23b512/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 983c45d..5711852 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.test.MockMetricsReporter;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Properties;
 
@@ -78,4 +79,17 @@ public class KafkaConsumerTest {
         Assert.assertTrue(consumer.subscription().isEmpty());
         Assert.assertTrue(consumer.assignment().isEmpty());
     }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testSeekNegative() {
+        Properties props = new Properties();
+        props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testSeekNegative");
+        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
+
+        KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(
+                props, new ByteArrayDeserializer(), new ByteArrayDeserializer());
+        consumer.assign(Arrays.asList(new TopicPartition("nonExistTopic", 0)));
+        consumer.seek(new TopicPartition("nonExistTopic", 0), -1);
+    }
 }