You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/08/07 00:51:40 UTC
kafka git commit: KAFKA-2340: Improve KafkaConsumer Fetcher test
coverage
Repository: kafka
Updated Branches:
refs/heads/trunk 006b45c7e -> f6373e4d9
KAFKA-2340: Improve KafkaConsumer Fetcher test coverage
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Guozhang Wang
Closes #112 from hachikuji/KAFKA-2340 and squashes the following commits:
cc49ca2 [Jason Gustafson] KAFKA-2340; improve KafkaConsumer Fetcher test coverage
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f6373e4d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f6373e4d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f6373e4d
Branch: refs/heads/trunk
Commit: f6373e4d9929d123a8474ab7673ee701f63ac593
Parents: 006b45c
Author: Jason Gustafson <ja...@confluent.io>
Authored: Thu Aug 6 15:52:43 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Aug 6 15:52:43 2015 -0700
----------------------------------------------------------------------
.../clients/consumer/internals/Fetcher.java | 4 +-
.../org/apache/kafka/clients/MockClient.java | 49 ++++++-
.../clients/consumer/internals/FetcherTest.java | 133 +++++++++++++++++--
3 files changed, 169 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/f6373e4d/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 956197b..9dc6697 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -61,8 +61,8 @@ import java.util.Set;
* This class manage the fetching process with the brokers.
*/
public class Fetcher<K, V> {
- private static final long EARLIEST_OFFSET_TIMESTAMP = -2L;
- private static final long LATEST_OFFSET_TIMESTAMP = -1L;
+ public static final long EARLIEST_OFFSET_TIMESTAMP = -2L;
+ public static final long LATEST_OFFSET_TIMESTAMP = -1L;
private static final Logger log = LoggerFactory.getLogger(Fetcher.class);
http://git-wip-us.apache.org/repos/asf/kafka/blob/f6373e4d/clients/src/test/java/org/apache/kafka/clients/MockClient.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index d9c97e9..9133d85 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -34,15 +34,24 @@ import org.apache.kafka.common.utils.Time;
* A mock network client for use testing code
*/
public class MockClient implements KafkaClient {
+ public static final RequestMatcher ALWAYS_TRUE = new RequestMatcher() {
+ @Override
+ public boolean matches(ClientRequest request) {
+ return true;
+ }
+ };
private class FutureResponse {
public final Struct responseBody;
public final boolean disconnected;
+ public final RequestMatcher requestMatcher;
- public FutureResponse(Struct responseBody, boolean disconnected) {
+ public FutureResponse(Struct responseBody, boolean disconnected, RequestMatcher requestMatcher) {
this.responseBody = responseBody;
this.disconnected = disconnected;
+ this.requestMatcher = requestMatcher;
}
+
}
private final Time time;
@@ -94,6 +103,9 @@ public class MockClient implements KafkaClient {
public void send(ClientRequest request) {
if (!futureResponses.isEmpty()) {
FutureResponse futureResp = futureResponses.poll();
+ if (!futureResp.requestMatcher.matches(request))
+ throw new IllegalStateException("Next in line response did not match expected request");
+
ClientResponse resp = new ClientResponse(request, time.milliseconds(), futureResp.disconnected, futureResp.responseBody);
responses.add(resp);
} else {
@@ -141,11 +153,32 @@ public class MockClient implements KafkaClient {
}
public void prepareResponse(Struct body) {
- prepareResponse(body, false);
+ prepareResponse(ALWAYS_TRUE, body, false);
+ }
+
+ /**
+ * Prepare a response for a request matching the provided matcher. If the matcher does not
+ * match, {@link #send(ClientRequest)} will throw IllegalStateException
+ * @param matcher The matcher to apply
+ * @param body The response body
+ */
+ public void prepareResponse(RequestMatcher matcher, Struct body) {
+ prepareResponse(matcher, body, false);
}
public void prepareResponse(Struct body, boolean disconnected) {
- futureResponses.add(new FutureResponse(body, disconnected));
+ prepareResponse(ALWAYS_TRUE, body, disconnected);
+ }
+
+ /**
+ * Prepare a response for a request matching the provided matcher. If the matcher does not
+ * match, {@link #send(ClientRequest)} will throw IllegalStateException
+ * @param matcher The matcher to apply
+ * @param body The response body
+ * @param disconnected Whether the request was disconnected
+ */
+ public void prepareResponse(RequestMatcher matcher, Struct body, boolean disconnected) {
+ futureResponses.add(new FutureResponse(body, disconnected, matcher));
}
public void setNode(Node node) {
@@ -180,4 +213,14 @@ public class MockClient implements KafkaClient {
return this.node;
}
+ /**
+ * The RequestMatcher provides a way to match a particular request to a response prepared
+ * through {@link #prepareResponse(RequestMatcher, Struct)}. Basically this allows testers
+ * to inspect the request body for the type of the request or for specific fields that should be set,
+ * and to fail the test if it doesn't match.
+ */
+ public interface RequestMatcher {
+ boolean matches(ClientRequest request);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f6373e4d/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 56850bb..a7c83ca 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.consumer.internals;
+import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -30,6 +31,8 @@ import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.requests.ListOffsetRequest;
+import org.apache.kafka.common.requests.ListOffsetResponse;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.MockTime;
@@ -40,11 +43,13 @@ import org.junit.Test;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@@ -158,26 +163,34 @@ public class FetcherTest {
}
@Test
- public void testFetchFailed() {
+ public void testFetchNotLeaderForPartition() {
subscriptions.subscribe(tp);
subscriptions.seek(tp, 0);
- // fetch with not leader
fetcher.initFetches(cluster);
client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NOT_LEADER_FOR_PARTITION.code(), 100L));
consumerClient.poll(0);
assertEquals(0, fetcher.fetchedRecords().size());
assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds()));
+ }
+
+ @Test
+ public void testFetchUnknownTopicOrPartition() {
+ subscriptions.subscribe(tp);
+ subscriptions.seek(tp, 0);
- // fetch with unknown topic partition
fetcher.initFetches(cluster);
client.prepareResponse(fetchResponse(this.records.buffer(), Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), 100L));
consumerClient.poll(0);
assertEquals(0, fetcher.fetchedRecords().size());
assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds()));
+ }
+
+ @Test
+ public void testFetchOffsetOutOfRange() {
+ subscriptions.subscribe(tp);
+ subscriptions.seek(tp, 0);
- // fetch with out of range
- subscriptions.fetched(tp, 5);
fetcher.initFetches(cluster);
client.prepareResponse(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L));
consumerClient.poll(0);
@@ -188,18 +201,94 @@ public class FetcherTest {
}
@Test
- public void testFetchOutOfRange() {
+ public void testFetchDisconnected() {
subscriptions.subscribe(tp);
- subscriptions.seek(tp, 5);
+ subscriptions.seek(tp, 0);
- // fetch with out of range
fetcher.initFetches(cluster);
- client.prepareResponse(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L));
+ client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L), true);
consumerClient.poll(0);
- assertTrue(subscriptions.isOffsetResetNeeded(tp));
assertEquals(0, fetcher.fetchedRecords().size());
- assertEquals(null, subscriptions.fetched(tp));
- assertEquals(null, subscriptions.consumed(tp));
+
+ // disconnects should have no affect on subscription state
+ assertFalse(subscriptions.isOffsetResetNeeded(tp));
+ assertTrue(subscriptions.isFetchable(tp));
+ assertEquals(0, (long) subscriptions.fetched(tp));
+ assertEquals(0, (long) subscriptions.consumed(tp));
+ }
+
+ @Test
+ public void testUpdateFetchPositionToCommitted() {
+ // unless a specific reset is expected, the default behavior is to reset to the committed
+ // position if one is present
+ subscriptions.subscribe(tp);
+ subscriptions.committed(tp, 5);
+
+ fetcher.updateFetchPositions(Collections.singleton(tp));
+ assertTrue(subscriptions.isFetchable(tp));
+ assertEquals(5, (long) subscriptions.fetched(tp));
+ assertEquals(5, (long) subscriptions.consumed(tp));
+ }
+
+ @Test
+ public void testUpdateFetchPositionResetToDefaultOffset() {
+ subscriptions.subscribe(tp);
+ // with no commit position, we should reset using the default strategy defined above (EARLIEST)
+
+ client.prepareResponse(listOffsetRequestMatcher(Fetcher.EARLIEST_OFFSET_TIMESTAMP),
+ listOffsetResponse(Errors.NONE, Arrays.asList(5L)));
+ fetcher.updateFetchPositions(Collections.singleton(tp));
+ assertFalse(subscriptions.isOffsetResetNeeded(tp));
+ assertTrue(subscriptions.isFetchable(tp));
+ assertEquals(5, (long) subscriptions.fetched(tp));
+ assertEquals(5, (long) subscriptions.consumed(tp));
+ }
+
+ @Test
+ public void testUpdateFetchPositionResetToLatestOffset() {
+ subscriptions.subscribe(tp);
+ subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST);
+
+ client.prepareResponse(listOffsetRequestMatcher(Fetcher.LATEST_OFFSET_TIMESTAMP),
+ listOffsetResponse(Errors.NONE, Arrays.asList(5L)));
+ fetcher.updateFetchPositions(Collections.singleton(tp));
+ assertFalse(subscriptions.isOffsetResetNeeded(tp));
+ assertTrue(subscriptions.isFetchable(tp));
+ assertEquals(5, (long) subscriptions.fetched(tp));
+ assertEquals(5, (long) subscriptions.consumed(tp));
+ }
+
+ @Test
+ public void testUpdateFetchPositionResetToEarliestOffset() {
+ subscriptions.subscribe(tp);
+ subscriptions.needOffsetReset(tp, OffsetResetStrategy.EARLIEST);
+
+ client.prepareResponse(listOffsetRequestMatcher(Fetcher.EARLIEST_OFFSET_TIMESTAMP),
+ listOffsetResponse(Errors.NONE, Arrays.asList(5L)));
+ fetcher.updateFetchPositions(Collections.singleton(tp));
+ assertFalse(subscriptions.isOffsetResetNeeded(tp));
+ assertTrue(subscriptions.isFetchable(tp));
+ assertEquals(5, (long) subscriptions.fetched(tp));
+ assertEquals(5, (long) subscriptions.consumed(tp));
+ }
+
+ @Test
+ public void testUpdateFetchPositionDisconnect() {
+ subscriptions.subscribe(tp);
+ subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST);
+
+ // First request gets a disconnect
+ client.prepareResponse(listOffsetRequestMatcher(Fetcher.LATEST_OFFSET_TIMESTAMP),
+ listOffsetResponse(Errors.NONE, Arrays.asList(5L)), true);
+
+ // Next one succeeds
+ client.prepareResponse(listOffsetRequestMatcher(Fetcher.LATEST_OFFSET_TIMESTAMP),
+ listOffsetResponse(Errors.NONE, Arrays.asList(5L)));
+ fetcher.updateFetchPositions(Collections.singleton(tp));
+ assertFalse(subscriptions.isOffsetResetNeeded(tp));
+ assertTrue(subscriptions.isFetchable(tp));
+ assertEquals(5, (long) subscriptions.fetched(tp));
+ assertEquals(5, (long) subscriptions.consumed(tp));
}
@Test
@@ -213,6 +302,26 @@ public class FetcherTest {
assertEquals(cluster.topics().size(), allTopics.size());
}
+ private MockClient.RequestMatcher listOffsetRequestMatcher(final long timestamp) {
+ // matches any list offset request with the provided timestamp
+ return new MockClient.RequestMatcher() {
+ @Override
+ public boolean matches(ClientRequest request) {
+ ListOffsetRequest req = new ListOffsetRequest(request.request().body());
+ ListOffsetRequest.PartitionData partitionData = req.offsetData().get(tp);
+ return partitionData != null && partitionData.timestamp == timestamp;
+ }
+ };
+ }
+
+ private Struct listOffsetResponse(Errors error, List<Long> offsets) {
+ ListOffsetResponse.PartitionData partitionData = new ListOffsetResponse.PartitionData(error.code(), offsets);
+ Map<TopicPartition, ListOffsetResponse.PartitionData> allPartitionData = new HashMap<>();
+ allPartitionData.put(tp, partitionData);
+ ListOffsetResponse response = new ListOffsetResponse(allPartitionData);
+ return response.toStruct();
+ }
+
private Struct fetchResponse(ByteBuffer buffer, short error, long hw) {
FetchResponse response = new FetchResponse(Collections.singletonMap(tp, new FetchResponse.PartitionData(error, hw, buffer)));
return response.toStruct();