You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/03/08 23:17:04 UTC
kafka git commit: MINOR: Use ConcurrentMap for ConsumerNetworkClient
UnsentRequests
Repository: kafka
Updated Branches:
refs/heads/trunk f7354e779 -> 022bf1295
MINOR: Use ConcurrentMap for ConsumerNetworkClient UnsentRequests
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Ismael Juma <is...@juma.me.uk>
Closes #2656 from hachikuji/minor-cleanup-unsent-requests
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/022bf129
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/022bf129
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/022bf129
Branch: refs/heads/trunk
Commit: 022bf129518e33e165f9ceefc4ab9e622952d3bd
Parents: f7354e7
Author: Jason Gustafson <ja...@confluent.io>
Authored: Wed Mar 8 23:16:53 2017 +0000
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Wed Mar 8 23:16:53 2017 +0000
----------------------------------------------------------------------
.../kafka/clients/consumer/KafkaConsumer.java | 2 +-
.../internals/ConsumerNetworkClient.java | 95 ++++++++++++--------
.../clients/consumer/internals/Fetcher.java | 2 +-
3 files changed, 58 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/022bf129/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 51b00af..612f446 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1004,7 +1004,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
//
// NOTE: since the consumed position has already been updated, we must not allow
// wakeups or any other errors to be triggered prior to returning the fetched records.
- if (fetcher.sendFetches() > 0 || client.hasPendingRequest())
+ if (fetcher.sendFetches() > 0 || client.hasPendingRequests())
client.pollNoWakeup();
if (this.interceptors == null)
http://git-wip-us.apache.org/repos/asf/kafka/blob/022bf129/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index eb25359..478ed3f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -37,11 +37,11 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.common.errors.InterruptException;
@@ -242,6 +242,9 @@ public class ConsumerNetworkClient implements Closeable {
// fail requests that couldn't be sent if they have expired
failExpiredRequests(now);
+
+ // clean unsent requests collection to keep the map from growing indefinitely
+ unsent.clean();
}
// called without the lock to avoid deadlock potential if handlers need to acquire locks
@@ -271,12 +274,12 @@ public class ConsumerNetworkClient implements Closeable {
long startMs = time.milliseconds();
long remainingMs = timeoutMs;
- while (hasPendingRequest(node) && remainingMs > 0) {
+ while (hasPendingRequests(node) && remainingMs > 0) {
poll(remainingMs);
remainingMs = timeoutMs - (time.milliseconds() - startMs);
}
- return !hasPendingRequest(node);
+ return !hasPendingRequests(node);
}
/**
@@ -297,8 +300,8 @@ public class ConsumerNetworkClient implements Closeable {
* @param node The node in question
* @return A boolean indicating whether there is pending request
*/
- public boolean hasPendingRequest(Node node) {
- if (unsent.hasRequest(node))
+ public boolean hasPendingRequests(Node node) {
+ if (unsent.hasRequests(node))
return true;
synchronized (this) {
return client.inFlightRequestCount(node.idString()) > 0;
@@ -321,8 +324,8 @@ public class ConsumerNetworkClient implements Closeable {
* have been transmitted (i.e. in-flight requests) and those which are awaiting transmission.
* @return A boolean indicating whether there is pending request
*/
- public boolean hasPendingRequest() {
- if (unsent.hasRequest())
+ public boolean hasPendingRequests() {
+ if (unsent.hasRequests())
return true;
synchronized (this) {
return client.inFlightRequestCount() > 0;
@@ -350,8 +353,7 @@ public class ConsumerNetworkClient implements Closeable {
// by NetworkClient, so we just need to check whether connections for any of the unsent
// requests have been disconnected; if they have, then we complete the corresponding future
// and set the disconnect flag in the ClientResponse
- List<Node> nodes = unsent.nodes();
- for (Node node : nodes) {
+ for (Node node : unsent.nodes()) {
if (client.connectionFailed(node)) {
// Remove entry before invoking request callback to avoid callbacks handling
// coordinator failures traversing the unsent list again.
@@ -368,7 +370,7 @@ public class ConsumerNetworkClient implements Closeable {
private void failExpiredRequests(long now) {
// clear all expired unsent requests and fail their corresponding futures
- List<ClientRequest> expiredRequests = unsent.removeExpiredRequests(now, unsentExpiryMs);
+ Collection<ClientRequest> expiredRequests = unsent.removeExpiredRequests(now, unsentExpiryMs);
for (ClientRequest request : expiredRequests) {
RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler) request.callback();
handler.onFailure(new TimeoutException("Failed to send request after " + unsentExpiryMs + " ms."));
@@ -471,12 +473,12 @@ public class ConsumerNetworkClient implements Closeable {
}
}
- public class RequestFutureCompletionHandler implements RequestCompletionHandler {
+ private class RequestFutureCompletionHandler implements RequestCompletionHandler {
private final RequestFuture<ClientResponse> future;
private ClientResponse response;
private RuntimeException e;
- public RequestFutureCompletionHandler() {
+ private RequestFutureCompletionHandler() {
this.future = new RequestFuture<>();
}
@@ -525,55 +527,55 @@ public class ConsumerNetworkClient implements Closeable {
boolean shouldBlock();
}
-
/*
* A threadsafe helper class to hold requests per node that have not been sent yet
*/
private final static class UnsentRequests {
- private final Map<Node, ConcurrentLinkedQueue<ClientRequest>> unsent;
+ private final ConcurrentMap<Node, ConcurrentLinkedQueue<ClientRequest>> unsent;
- public UnsentRequests() {
- unsent = new HashMap<>();
+ private UnsentRequests() {
+ unsent = new ConcurrentHashMap<>();
}
- public synchronized void put(Node node, ClientRequest request) {
- ConcurrentLinkedQueue<ClientRequest> nodeUnsent = unsent.get(node);
- if (nodeUnsent == null) {
- nodeUnsent = new ConcurrentLinkedQueue<>();
- unsent.put(node, nodeUnsent);
+ public void put(Node node, ClientRequest request) {
+ // the lock protects the put from a concurrent removal of the queue for the node
+ synchronized (unsent) {
+ ConcurrentLinkedQueue<ClientRequest> requests = unsent.get(node);
+ if (requests == null) {
+ requests = new ConcurrentLinkedQueue<>();
+ unsent.put(node, requests);
+ }
+ requests.add(request);
}
- nodeUnsent.add(request);
}
- public synchronized int requestCount(Node node) {
+ public int requestCount(Node node) {
ConcurrentLinkedQueue<ClientRequest> requests = unsent.get(node);
return requests == null ? 0 : requests.size();
}
- public synchronized int requestCount() {
+ public int requestCount() {
int total = 0;
for (ConcurrentLinkedQueue<ClientRequest> requests : unsent.values())
total += requests.size();
return total;
}
- public synchronized boolean hasRequest(Node node) {
+ public boolean hasRequests(Node node) {
ConcurrentLinkedQueue<ClientRequest> requests = unsent.get(node);
return requests != null && !requests.isEmpty();
}
- public synchronized boolean hasRequest() {
+ public boolean hasRequests() {
for (ConcurrentLinkedQueue<ClientRequest> requests : unsent.values())
if (!requests.isEmpty())
return true;
return false;
}
- public synchronized List<ClientRequest> removeExpiredRequests(long now, long unsentExpiryMs) {
+ public Collection<ClientRequest> removeExpiredRequests(long now, long unsentExpiryMs) {
List<ClientRequest> expiredRequests = new ArrayList<>();
- Iterator<ConcurrentLinkedQueue<ClientRequest>> iterator = unsent.values().iterator();
- while (iterator.hasNext()) {
- ConcurrentLinkedQueue<ClientRequest> requests = iterator.next();
+ for (ConcurrentLinkedQueue<ClientRequest> requests : unsent.values()) {
Iterator<ClientRequest> requestIterator = requests.iterator();
while (requestIterator.hasNext()) {
ClientRequest request = requestIterator.next();
@@ -583,24 +585,39 @@ public class ConsumerNetworkClient implements Closeable {
} else
break;
}
- if (requests.isEmpty())
- iterator.remove();
}
return expiredRequests;
}
- public synchronized Collection<ClientRequest> remove(Node node) {
- ConcurrentLinkedQueue<ClientRequest> requests = unsent.remove(node);
- return requests == null ? Collections.<ClientRequest>emptyList() : requests;
+ public void clean() {
+ // the lock protects removal from a concurrent put which could otherwise mutate the
+ // queue after it has been removed from the map
+ synchronized (unsent) {
+ Iterator<ConcurrentLinkedQueue<ClientRequest>> iterator = unsent.values().iterator();
+ while (iterator.hasNext()) {
+ ConcurrentLinkedQueue<ClientRequest> requests = iterator.next();
+ if (requests.isEmpty())
+ iterator.remove();
+ }
+ }
+ }
+
+ public Collection<ClientRequest> remove(Node node) {
+ // the lock protects removal from a concurrent put which could otherwise mutate the
+ // queue after it has been removed from the map
+ synchronized (unsent) {
+ ConcurrentLinkedQueue<ClientRequest> requests = unsent.remove(node);
+ return requests == null ? Collections.<ClientRequest>emptyList() : requests;
+ }
}
- public synchronized Iterator<ClientRequest> requestIterator(Node node) {
+ public Iterator<ClientRequest> requestIterator(Node node) {
ConcurrentLinkedQueue<ClientRequest> requests = unsent.get(node);
return requests == null ? Collections.<ClientRequest>emptyIterator() : requests.iterator();
}
- public synchronized List<Node> nodes() {
- return new ArrayList<>(unsent.keySet());
+ public Collection<Node> nodes() {
+ return unsent.keySet();
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/022bf129/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 536e4e8..441206a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -713,7 +713,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener {
Node node = cluster.leaderFor(partition);
if (node == null) {
metadata.requestUpdate();
- } else if (!this.client.hasPendingRequest(node)) {
+ } else if (!this.client.hasPendingRequests(node)) {
// if there is a leader and no in-flight requests, issue a new fetch
LinkedHashMap<TopicPartition, FetchRequest.PartitionData> fetch = fetchable.get(node);
if (fetch == null) {