You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/12/10 19:49:06 UTC
kafka git commit: KAFKA-2893: Add a simple non-negative partition
seek check
Repository: kafka
Updated Branches:
refs/heads/trunk ed8748b7d -> 9d23b512c
KAFKA-2893: Add a simple non-negative partition seek check
Author: jinxing <ji...@fenbi.com>
Author: ZoneMayor <ji...@126.com>
Reviewers: Guozhang Wang
Closes #628 from ZoneMayor/trunk-KAFKA-2893
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9d23b512
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9d23b512
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9d23b512
Branch: refs/heads/trunk
Commit: 9d23b512ccb95940782f51c93bd6cdbed3e938db
Parents: ed8748b
Author: Jin Xing <ji...@fenbi.com>
Authored: Thu Dec 10 10:49:02 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Dec 10 10:49:02 2015 -0800
----------------------------------------------------------------------
.../apache/kafka/clients/consumer/KafkaConsumer.java | 3 +++
.../kafka/clients/consumer/KafkaConsumerTest.java | 14 ++++++++++++++
2 files changed, 17 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/9d23b512/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 912b307..772e621 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1005,6 +1005,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public void seek(TopicPartition partition, long offset) {
+ if (offset < 0) {
+ throw new IllegalArgumentException("seek offset must not be a negative number");
+ }
acquire();
try {
log.debug("Seeking to offset {} for partition {}", offset, partition);
http://git-wip-us.apache.org/repos/asf/kafka/blob/9d23b512/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 983c45d..5711852 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.test.MockMetricsReporter;
import org.junit.Assert;
import org.junit.Test;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;
@@ -78,4 +79,17 @@ public class KafkaConsumerTest {
Assert.assertTrue(consumer.subscription().isEmpty());
Assert.assertTrue(consumer.assignment().isEmpty());
}
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testSeekNegative() {
+ Properties props = new Properties();
+ props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testSeekNegative");
+ props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+ props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
+
+ KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(
+ props, new ByteArrayDeserializer(), new ByteArrayDeserializer());
+ consumer.assign(Arrays.asList(new TopicPartition("nonExistTopic", 0)));
+ consumer.seek(new TopicPartition("nonExistTopic", 0), -1);
+ }
}