You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2016/11/11 18:37:21 UTC

kafka git commit: KAFKA-4081; KafkaConsumer should not allow negative offsets to be committed

Repository: kafka
Updated Branches:
  refs/heads/trunk 64a860c58 -> 7282de39f


KAFKA-4081; KafkaConsumer should not allow negative offsets to be committed

Author: Mickael Maison <mi...@gmail.com>

Reviewers: Jiangjie Qin <be...@gmail.com>, Jason Gustafson <ja...@confluent.io>

Closes #1827 from mimaison/KAFKA-4081


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7282de39
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7282de39
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7282de39

Branch: refs/heads/trunk
Commit: 7282de39f7b7cef1ca85f21771f2401a4a67eaec
Parents: 64a860c
Author: Mickael Maison <mi...@gmail.com>
Authored: Fri Nov 11 10:21:46 2016 -0800
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Fri Nov 11 10:21:59 2016 -0800

----------------------------------------------------------------------
 .../consumer/internals/ConsumerCoordinator.java     |  3 +++
 .../consumer/internals/ConsumerCoordinatorTest.java | 16 ++++++++++++++++
 2 files changed, 19 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/7282de39/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index a8d94fa..2621c09 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -584,6 +584,9 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         Map<TopicPartition, OffsetCommitRequest.PartitionData> offsetData = new HashMap<>(offsets.size());
         for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
             OffsetAndMetadata offsetAndMetadata = entry.getValue();
+            if (offsetAndMetadata.offset() < 0) {
+                return RequestFuture.failure(new IllegalArgumentException("Invalid offset: " + offsetAndMetadata.offset()));
+            }
             offsetData.put(entry.getKey(), new OffsetCommitRequest.PartitionData(
                     offsetAndMetadata.offset(), offsetAndMetadata.metadata()));
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7282de39/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 3957615..a83d7a9 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -1115,6 +1115,22 @@ public class ConsumerCoordinatorTest {
         coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)));
     }
 
+    @Test(expected = IllegalArgumentException.class)
+    public void testCommitSyncNegativeOffset() {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(-1L)));
+    }
+
+    @Test
+    public void testCommitAsyncNegativeOffset() {
+        int invokedBeforeTest = defaultOffsetCommitCallback.invoked;
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(-1L)), null);
+        coordinator.invokeCompletedOffsetCommitCallbacks();
+        assertEquals(invokedBeforeTest + 1, defaultOffsetCommitCallback.invoked);
+        assertTrue(defaultOffsetCommitCallback.exception instanceof IllegalArgumentException);
+    }
+
     @Test
     public void testRefreshOffset() {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));