You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/08/07 00:51:40 UTC

kafka git commit: KAFKA-2340: Improve KafkaConsumer Fetcher test coverage

Repository: kafka
Updated Branches:
  refs/heads/trunk 006b45c7e -> f6373e4d9


KAFKA-2340: Improve KafkaConsumer Fetcher test coverage

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Guozhang Wang

Closes #112 from hachikuji/KAFKA-2340 and squashes the following commits:

cc49ca2 [Jason Gustafson] KAFKA-2340; improve KafkaConsumer Fetcher test coverage


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f6373e4d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f6373e4d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f6373e4d

Branch: refs/heads/trunk
Commit: f6373e4d9929d123a8474ab7673ee701f63ac593
Parents: 006b45c
Author: Jason Gustafson <ja...@confluent.io>
Authored: Thu Aug 6 15:52:43 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Aug 6 15:52:43 2015 -0700

----------------------------------------------------------------------
 .../clients/consumer/internals/Fetcher.java     |   4 +-
 .../org/apache/kafka/clients/MockClient.java    |  49 ++++++-
 .../clients/consumer/internals/FetcherTest.java | 133 +++++++++++++++++--
 3 files changed, 169 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f6373e4d/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 956197b..9dc6697 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -61,8 +61,8 @@ import java.util.Set;
  * This class manage the fetching process with the brokers.
  */
 public class Fetcher<K, V> {
-    private static final long EARLIEST_OFFSET_TIMESTAMP = -2L;
-    private static final long LATEST_OFFSET_TIMESTAMP = -1L;
+    public static final long EARLIEST_OFFSET_TIMESTAMP = -2L;
+    public static final long LATEST_OFFSET_TIMESTAMP = -1L;
 
     private static final Logger log = LoggerFactory.getLogger(Fetcher.class);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6373e4d/clients/src/test/java/org/apache/kafka/clients/MockClient.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index d9c97e9..9133d85 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -34,15 +34,24 @@ import org.apache.kafka.common.utils.Time;
  * A mock network client for use testing code
  */
 public class MockClient implements KafkaClient {
+    public static final RequestMatcher ALWAYS_TRUE = new RequestMatcher() {
+        @Override
+        public boolean matches(ClientRequest request) {
+            return true;
+        }
+    };
 
     private class FutureResponse {
         public final Struct responseBody;
         public final boolean disconnected;
+        public final RequestMatcher requestMatcher;
 
-        public FutureResponse(Struct responseBody, boolean disconnected) {
+        public FutureResponse(Struct responseBody, boolean disconnected, RequestMatcher requestMatcher) {
             this.responseBody = responseBody;
             this.disconnected = disconnected;
+            this.requestMatcher = requestMatcher;
         }
+
     }
 
     private final Time time;
@@ -94,6 +103,9 @@ public class MockClient implements KafkaClient {
     public void send(ClientRequest request) {
         if (!futureResponses.isEmpty()) {
             FutureResponse futureResp = futureResponses.poll();
+            if (!futureResp.requestMatcher.matches(request))
+                throw new IllegalStateException("Next in line response did not match expected request");
+
             ClientResponse resp = new ClientResponse(request, time.milliseconds(), futureResp.disconnected, futureResp.responseBody);
             responses.add(resp);
         } else {
@@ -141,11 +153,32 @@ public class MockClient implements KafkaClient {
     }
 
     public void prepareResponse(Struct body) {
-        prepareResponse(body, false);
+        prepareResponse(ALWAYS_TRUE, body, false);
+    }
+
+    /**
+     * Prepare a response for a request matching the provided matcher. If the matcher does not
+     * match, {@link #send(ClientRequest)} will throw IllegalStateException
+     * @param matcher The matcher to apply
+     * @param body The response body
+     */
+    public void prepareResponse(RequestMatcher matcher, Struct body) {
+        prepareResponse(matcher, body, false);
     }
 
     public void prepareResponse(Struct body, boolean disconnected) {
-        futureResponses.add(new FutureResponse(body, disconnected));
+        prepareResponse(ALWAYS_TRUE, body, disconnected);
+    }
+
+    /**
+     * Prepare a response for a request matching the provided matcher. If the matcher does not
+     * match, {@link #send(ClientRequest)} will throw IllegalStateException
+     * @param matcher The matcher to apply
+     * @param body The response body
+     * @param disconnected Whether the request was disconnected
+     */
+    public void prepareResponse(RequestMatcher matcher, Struct body, boolean disconnected) {
+        futureResponses.add(new FutureResponse(body, disconnected, matcher));
     }
 
     public void setNode(Node node) {
@@ -180,4 +213,14 @@ public class MockClient implements KafkaClient {
         return this.node;
     }
 
+    /**
+     * The RequestMatcher provides a way to match a particular request to a response prepared
+     * through {@link #prepareResponse(RequestMatcher, Struct)}. Basically this allows testers
+     * to inspect the request body for the type of the request or for specific fields that should be set,
+     * and to fail the test if it doesn't match.
+     */
+    public interface RequestMatcher {
+        boolean matches(ClientRequest request);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6373e4d/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 56850bb..a7c83ca 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.clients.ClientRequest;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -30,6 +31,8 @@ import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.requests.ListOffsetRequest;
+import org.apache.kafka.common.requests.ListOffsetResponse;
 import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.utils.MockTime;
@@ -40,11 +43,13 @@ import org.junit.Test;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
@@ -158,26 +163,34 @@ public class FetcherTest {
     }
 
     @Test
-    public void testFetchFailed() {
+    public void testFetchNotLeaderForPartition() {
         subscriptions.subscribe(tp);
         subscriptions.seek(tp, 0);
 
-        // fetch with not leader
         fetcher.initFetches(cluster);
         client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NOT_LEADER_FOR_PARTITION.code(), 100L));
         consumerClient.poll(0);
         assertEquals(0, fetcher.fetchedRecords().size());
         assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds()));
+    }
+
+    @Test
+    public void testFetchUnknownTopicOrPartition() {
+        subscriptions.subscribe(tp);
+        subscriptions.seek(tp, 0);
 
-        // fetch with unknown topic partition
         fetcher.initFetches(cluster);
         client.prepareResponse(fetchResponse(this.records.buffer(), Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), 100L));
         consumerClient.poll(0);
         assertEquals(0, fetcher.fetchedRecords().size());
         assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds()));
+    }
+
+    @Test
+    public void testFetchOffsetOutOfRange() {
+        subscriptions.subscribe(tp);
+        subscriptions.seek(tp, 0);
 
-        // fetch with out of range
-        subscriptions.fetched(tp, 5);
         fetcher.initFetches(cluster);
         client.prepareResponse(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L));
         consumerClient.poll(0);
@@ -188,18 +201,94 @@ public class FetcherTest {
     }
 
     @Test
-    public void testFetchOutOfRange() {
+    public void testFetchDisconnected() {
         subscriptions.subscribe(tp);
-        subscriptions.seek(tp, 5);
+        subscriptions.seek(tp, 0);
 
-        // fetch with out of range
         fetcher.initFetches(cluster);
-        client.prepareResponse(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L));
+        client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L), true);
         consumerClient.poll(0);
-        assertTrue(subscriptions.isOffsetResetNeeded(tp));
         assertEquals(0, fetcher.fetchedRecords().size());
-        assertEquals(null, subscriptions.fetched(tp));
-        assertEquals(null, subscriptions.consumed(tp));
+
+        // disconnects should have no affect on subscription state
+        assertFalse(subscriptions.isOffsetResetNeeded(tp));
+        assertTrue(subscriptions.isFetchable(tp));
+        assertEquals(0, (long) subscriptions.fetched(tp));
+        assertEquals(0, (long) subscriptions.consumed(tp));
+    }
+
+    @Test
+    public void testUpdateFetchPositionToCommitted() {
+        // unless a specific reset is expected, the default behavior is to reset to the committed
+        // position if one is present
+        subscriptions.subscribe(tp);
+        subscriptions.committed(tp, 5);
+
+        fetcher.updateFetchPositions(Collections.singleton(tp));
+        assertTrue(subscriptions.isFetchable(tp));
+        assertEquals(5, (long) subscriptions.fetched(tp));
+        assertEquals(5, (long) subscriptions.consumed(tp));
+    }
+
+    @Test
+    public void testUpdateFetchPositionResetToDefaultOffset() {
+        subscriptions.subscribe(tp);
+        // with no commit position, we should reset using the default strategy defined above (EARLIEST)
+
+        client.prepareResponse(listOffsetRequestMatcher(Fetcher.EARLIEST_OFFSET_TIMESTAMP),
+                listOffsetResponse(Errors.NONE, Arrays.asList(5L)));
+        fetcher.updateFetchPositions(Collections.singleton(tp));
+        assertFalse(subscriptions.isOffsetResetNeeded(tp));
+        assertTrue(subscriptions.isFetchable(tp));
+        assertEquals(5, (long) subscriptions.fetched(tp));
+        assertEquals(5, (long) subscriptions.consumed(tp));
+    }
+
+    @Test
+    public void testUpdateFetchPositionResetToLatestOffset() {
+        subscriptions.subscribe(tp);
+        subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST);
+
+        client.prepareResponse(listOffsetRequestMatcher(Fetcher.LATEST_OFFSET_TIMESTAMP),
+                listOffsetResponse(Errors.NONE, Arrays.asList(5L)));
+        fetcher.updateFetchPositions(Collections.singleton(tp));
+        assertFalse(subscriptions.isOffsetResetNeeded(tp));
+        assertTrue(subscriptions.isFetchable(tp));
+        assertEquals(5, (long) subscriptions.fetched(tp));
+        assertEquals(5, (long) subscriptions.consumed(tp));
+    }
+
+    @Test
+    public void testUpdateFetchPositionResetToEarliestOffset() {
+        subscriptions.subscribe(tp);
+        subscriptions.needOffsetReset(tp, OffsetResetStrategy.EARLIEST);
+
+        client.prepareResponse(listOffsetRequestMatcher(Fetcher.EARLIEST_OFFSET_TIMESTAMP),
+                listOffsetResponse(Errors.NONE, Arrays.asList(5L)));
+        fetcher.updateFetchPositions(Collections.singleton(tp));
+        assertFalse(subscriptions.isOffsetResetNeeded(tp));
+        assertTrue(subscriptions.isFetchable(tp));
+        assertEquals(5, (long) subscriptions.fetched(tp));
+        assertEquals(5, (long) subscriptions.consumed(tp));
+    }
+
+    @Test
+    public void testUpdateFetchPositionDisconnect() {
+        subscriptions.subscribe(tp);
+        subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST);
+
+        // First request gets a disconnect
+        client.prepareResponse(listOffsetRequestMatcher(Fetcher.LATEST_OFFSET_TIMESTAMP),
+                listOffsetResponse(Errors.NONE, Arrays.asList(5L)), true);
+
+        // Next one succeeds
+        client.prepareResponse(listOffsetRequestMatcher(Fetcher.LATEST_OFFSET_TIMESTAMP),
+                listOffsetResponse(Errors.NONE, Arrays.asList(5L)));
+        fetcher.updateFetchPositions(Collections.singleton(tp));
+        assertFalse(subscriptions.isOffsetResetNeeded(tp));
+        assertTrue(subscriptions.isFetchable(tp));
+        assertEquals(5, (long) subscriptions.fetched(tp));
+        assertEquals(5, (long) subscriptions.consumed(tp));
     }
 
     @Test
@@ -213,6 +302,26 @@ public class FetcherTest {
         assertEquals(cluster.topics().size(), allTopics.size());
     }
 
+    private MockClient.RequestMatcher listOffsetRequestMatcher(final long timestamp) {
+        // matches any list offset request with the provided timestamp
+        return new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(ClientRequest request) {
+                ListOffsetRequest req = new ListOffsetRequest(request.request().body());
+                ListOffsetRequest.PartitionData partitionData = req.offsetData().get(tp);
+                return partitionData != null && partitionData.timestamp == timestamp;
+            }
+        };
+    }
+
+    private Struct listOffsetResponse(Errors error, List<Long> offsets) {
+        ListOffsetResponse.PartitionData partitionData = new ListOffsetResponse.PartitionData(error.code(), offsets);
+        Map<TopicPartition, ListOffsetResponse.PartitionData> allPartitionData = new HashMap<>();
+        allPartitionData.put(tp, partitionData);
+        ListOffsetResponse response = new ListOffsetResponse(allPartitionData);
+        return response.toStruct();
+    }
+
     private Struct fetchResponse(ByteBuffer buffer, short error, long hw) {
         FetchResponse response = new FetchResponse(Collections.singletonMap(tp, new FetchResponse.PartitionData(error, hw, buffer)));
         return response.toStruct();