You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2016/10/14 22:36:56 UTC
kafka git commit: KAFKA-4303;
Ensure commitSync does not block unnecessarily in poll without
in-flight requests
Repository: kafka
Updated Branches:
refs/heads/trunk e7663a306 -> 6199c6277
KAFKA-4303; Ensure commitSync does not block unnecessarily in poll without in-flight requests
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Guozhang Wang <wa...@gmail.com>, Ismael Juma <is...@juma.me.uk>
Closes #2031 from hachikuji/KAFKA-4303
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6199c627
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6199c627
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6199c627
Branch: refs/heads/trunk
Commit: 6199c62776bf3ce9467703ca651a0119b261e60e
Parents: e7663a3
Author: Jason Gustafson <ja...@confluent.io>
Authored: Fri Oct 14 15:36:43 2016 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Fri Oct 14 15:36:43 2016 -0700
----------------------------------------------------------------------
.../apache/kafka/clients/InFlightRequests.java | 16 +++++++-----
.../kafka/clients/consumer/KafkaConsumer.java | 8 +-----
.../internals/ConsumerNetworkClient.java | 8 ++++--
.../clients/consumer/internals/Fetcher.java | 16 +-----------
.../internals/ConsumerNetworkClientTest.java | 27 +++++++++++++++++++-
.../clients/consumer/internals/FetcherTest.java | 4 ---
6 files changed, 43 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/6199c627/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
index 8de19ee..91b9dba 100644
--- a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
+++ b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
@@ -26,7 +26,7 @@ import java.util.Map;
final class InFlightRequests {
private final int maxInFlightRequestsPerConnection;
- private final Map<String, Deque<ClientRequest>> requests = new HashMap<String, Deque<ClientRequest>>();
+ private final Map<String, Deque<ClientRequest>> requests = new HashMap<>();
public InFlightRequests(int maxInFlightRequestsPerConnection) {
this.maxInFlightRequestsPerConnection = maxInFlightRequestsPerConnection;
@@ -133,14 +133,16 @@ final class InFlightRequests {
* @return list of nodes
*/
public List<String> getNodesWithTimedOutRequests(long now, int requestTimeout) {
- List<String> nodeIds = new LinkedList<String>();
- for (String nodeId : requests.keySet()) {
- if (inFlightRequestCount(nodeId) > 0) {
- ClientRequest request = requests.get(nodeId).peekLast();
+ List<String> nodeIds = new LinkedList<>();
+ for (Map.Entry<String, Deque<ClientRequest>> requestEntry : requests.entrySet()) {
+ String nodeId = requestEntry.getKey();
+ Deque<ClientRequest> deque = requestEntry.getValue();
+
+ if (!deque.isEmpty()) {
+ ClientRequest request = deque.peekLast();
long timeSinceSend = now - request.sendTimeMs();
- if (timeSinceSend > requestTimeout) {
+ if (timeSinceSend > requestTimeout)
nodeIds.add(nodeId);
- }
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/6199c627/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index b2b4bf0..b384211 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1025,12 +1025,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
// send any new fetches (won't resend pending fetches)
fetcher.sendFetches();
- // if no fetches could be sent at the moment (which can happen if a partition leader is in the
- // blackout period following a disconnect, or if the partition leader is unknown), then we don't
- // block for longer than the retry backoff duration.
- if (!fetcher.hasInFlightFetches())
- timeout = Math.min(timeout, retryBackoffMs);
-
long now = time.milliseconds();
long pollTimeout = Math.min(coordinator.timeToNextPoll(now), timeout);
@@ -1039,7 +1033,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
public boolean shouldBlock() {
// since a fetch might be completed by the background thread, we need this poll condition
// to ensure that we do not block unnecessarily in poll()
- return !fetcher.hasCompletedFetches() && fetcher.hasInFlightFetches();
+ return !fetcher.hasCompletedFetches();
}
});
http://git-wip-us.apache.org/repos/asf/kafka/blob/6199c627/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index 21fe0b8..2495b23 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -47,6 +47,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
*/
public class ConsumerNetworkClient implements Closeable {
private static final Logger log = LoggerFactory.getLogger(ConsumerNetworkClient.class);
+ private static final long MAX_POLL_TIMEOUT_MS = 5000L;
// the mutable state of this class is protected by the object's monitor (excluding the wakeup
// flag and the request completion queue below).
@@ -176,7 +177,7 @@ public class ConsumerNetworkClient implements Closeable {
*/
public void poll(RequestFuture<?> future) {
while (!future.isDone())
- poll(Long.MAX_VALUE, time.milliseconds(), future);
+ poll(MAX_POLL_TIMEOUT_MS, time.milliseconds(), future);
}
/**
@@ -225,7 +226,10 @@ public class ConsumerNetworkClient implements Closeable {
// condition becomes satisfied after the call to shouldBlock() (because of a fired completion
// handler), the client will be woken up.
if (pollCondition == null || pollCondition.shouldBlock()) {
- client.poll(timeout, now);
+ // if there are no requests in flight, do not block longer than the retry backoff
+ if (client.inFlightRequestCount() == 0)
+ timeout = Math.min(timeout, retryBackoffMs);
+ client.poll(Math.min(MAX_POLL_TIMEOUT_MS, timeout), now);
now = time.milliseconds();
} else {
client.poll(0, now);
http://git-wip-us.apache.org/repos/asf/kafka/blob/6199c627/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 9e9ae92..bfc1a0b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -17,6 +17,7 @@ import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.Cluster;
@@ -41,7 +42,6 @@ import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.InvalidRecordException;
import org.apache.kafka.common.record.LogEntry;
import org.apache.kafka.common.record.MemoryRecords;
-import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.FetchRequest;
@@ -91,7 +91,6 @@ public class Fetcher<K, V> {
private final FetchManagerMetrics sensors;
private final SubscriptionState subscriptions;
private final ConcurrentLinkedQueue<CompletedFetch> completedFetches;
- private final AtomicInteger numInFlightFetches = new AtomicInteger(0);
private final Deserializer<K> keyDeserializer;
private final Deserializer<V> valueDeserializer;
@@ -137,15 +136,6 @@ public class Fetcher<K, V> {
return !completedFetches.isEmpty();
}
- /**
- * Check whether there are in-flight fetches. This is used to avoid unnecessary blocking in
- * {@link ConsumerNetworkClient#poll(long)} if there are no fetches to wait for. This method is thread-safe.
- * @return true if there are, false otherwise
- */
- public boolean hasInFlightFetches() {
- return numInFlightFetches.get() > 0;
- }
-
private boolean matchesRequestedPartitions(FetchRequest request, FetchResponse response) {
Set<TopicPartition> requestedPartitions = request.fetchData().keySet();
Set<TopicPartition> fetchedPartitions = response.responseData().keySet();
@@ -161,13 +151,10 @@ public class Fetcher<K, V> {
final FetchRequest request = fetchEntry.getValue();
final Node fetchTarget = fetchEntry.getKey();
- numInFlightFetches.incrementAndGet();
client.send(fetchTarget, ApiKeys.FETCH, request)
.addListener(new RequestFutureListener<ClientResponse>() {
@Override
public void onSuccess(ClientResponse resp) {
- numInFlightFetches.decrementAndGet();
-
FetchResponse response = new FetchResponse(resp.responseBody());
if (!matchesRequestedPartitions(request, response)) {
// obviously we expect the broker to always send us valid responses, so this check
@@ -194,7 +181,6 @@ public class Fetcher<K, V> {
@Override
public void onFailure(RuntimeException e) {
- numInFlightFetches.decrementAndGet();
log.debug("Fetch request to {} failed", fetchTarget, e);
}
});
http://git-wip-us.apache.org/repos/asf/kafka/blob/6199c627/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
index f90cd63..f8ad3ca 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
@@ -100,10 +100,35 @@ public class ConsumerNetworkClientTest {
@Test
public void blockWhenPollConditionNotSatisfied() {
+ long timeout = 4000L;
+
NetworkClient mockNetworkClient = EasyMock.mock(NetworkClient.class);
ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(mockNetworkClient, metadata, time, 100, 1000);
- EasyMock.expect(mockNetworkClient.poll(EasyMock.eq(Long.MAX_VALUE), EasyMock.anyLong())).andReturn(Collections.<ClientResponse>emptyList());
+ EasyMock.expect(mockNetworkClient.inFlightRequestCount()).andReturn(1);
+ EasyMock.expect(mockNetworkClient.poll(EasyMock.eq(timeout), EasyMock.anyLong())).andReturn(Collections.<ClientResponse>emptyList());
+
+ EasyMock.replay(mockNetworkClient);
+
+ consumerClient.poll(timeout, time.milliseconds(), new ConsumerNetworkClient.PollCondition() {
+ @Override
+ public boolean shouldBlock() {
+ return true;
+ }
+ });
+
+ EasyMock.verify(mockNetworkClient);
+ }
+
+ @Test
+ public void blockOnlyForRetryBackoffIfNoInflightRequests() {
+ long retryBackoffMs = 100L;
+
+ NetworkClient mockNetworkClient = EasyMock.mock(NetworkClient.class);
+ ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(mockNetworkClient, metadata, time, retryBackoffMs, 1000L);
+
+ EasyMock.expect(mockNetworkClient.inFlightRequestCount()).andReturn(0);
+ EasyMock.expect(mockNetworkClient.poll(EasyMock.eq(retryBackoffMs), EasyMock.anyLong())).andReturn(Collections.<ClientResponse>emptyList());
EasyMock.replay(mockNetworkClient);
http://git-wip-us.apache.org/repos/asf/kafka/blob/6199c627/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index faf6efa..5822646 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -128,13 +128,11 @@ public class FetcherTest {
// normal fetch
fetcher.sendFetches();
- assertTrue(fetcher.hasInFlightFetches());
assertFalse(fetcher.hasCompletedFetches());
client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0));
consumerClient.poll(0);
assertTrue(fetcher.hasCompletedFetches());
- assertFalse(fetcher.hasInFlightFetches());
Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords = fetcher.fetchedRecords();
assertTrue(partitionRecords.containsKey(tp));
@@ -155,13 +153,11 @@ public class FetcherTest {
subscriptions.seek(tp, 0);
fetcher.sendFetches();
- assertTrue(fetcher.hasInFlightFetches());
assertFalse(fetcher.hasCompletedFetches());
client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NOT_LEADER_FOR_PARTITION.code(), 100L, 0));
consumerClient.poll(0);
assertTrue(fetcher.hasCompletedFetches());
- assertFalse(fetcher.hasInFlightFetches());
Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords = fetcher.fetchedRecords();
assertFalse(partitionRecords.containsKey(tp));