You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2016/11/11 18:37:21 UTC
kafka git commit: KAFKA-4081;
KafkaConsumer should not allow negative offsets to be committed
Repository: kafka
Updated Branches:
refs/heads/trunk 64a860c58 -> 7282de39f
KAFKA-4081; KafkaConsumer should not allow negative offsets to be committed
Author: Mickael Maison <mi...@gmail.com>
Reviewers: Jiangjie Qin <be...@gmail.com>, Jason Gustafson <ja...@confluent.io>
Closes #1827 from mimaison/KAFKA-4081
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7282de39
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7282de39
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7282de39
Branch: refs/heads/trunk
Commit: 7282de39f7b7cef1ca85f21771f2401a4a67eaec
Parents: 64a860c
Author: Mickael Maison <mi...@gmail.com>
Authored: Fri Nov 11 10:21:46 2016 -0800
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Fri Nov 11 10:21:59 2016 -0800
----------------------------------------------------------------------
.../consumer/internals/ConsumerCoordinator.java | 3 +++
.../consumer/internals/ConsumerCoordinatorTest.java | 16 ++++++++++++++++
2 files changed, 19 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/7282de39/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index a8d94fa..2621c09 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -584,6 +584,9 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
Map<TopicPartition, OffsetCommitRequest.PartitionData> offsetData = new HashMap<>(offsets.size());
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
OffsetAndMetadata offsetAndMetadata = entry.getValue();
+ if (offsetAndMetadata.offset() < 0) {
+ return RequestFuture.failure(new IllegalArgumentException("Invalid offset: " + offsetAndMetadata.offset()));
+ }
offsetData.put(entry.getKey(), new OffsetCommitRequest.PartitionData(
offsetAndMetadata.offset(), offsetAndMetadata.metadata()));
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/7282de39/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 3957615..a83d7a9 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -1115,6 +1115,22 @@ public class ConsumerCoordinatorTest {
coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)));
}
+ @Test(expected = IllegalArgumentException.class)
+ public void testCommitSyncNegativeOffset() {
+ client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+ coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(-1L)));
+ }
+
+ @Test
+ public void testCommitAsyncNegativeOffset() {
+ int invokedBeforeTest = defaultOffsetCommitCallback.invoked;
+ client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+ coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(-1L)), null);
+ coordinator.invokeCompletedOffsetCommitCallbacks();
+ assertEquals(invokedBeforeTest + 1, defaultOffsetCommitCallback.invoked);
+ assertTrue(defaultOffsetCommitCallback.exception instanceof IllegalArgumentException);
+ }
+
@Test
public void testRefreshOffset() {
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));