You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/03/08 23:17:04 UTC

kafka git commit: MINOR: Use ConcurrentMap for ConsumerNetworkClient UnsentRequests

Repository: kafka
Updated Branches:
  refs/heads/trunk f7354e779 -> 022bf1295


MINOR: Use ConcurrentMap for ConsumerNetworkClient UnsentRequests

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Ismael Juma <is...@juma.me.uk>

Closes #2656 from hachikuji/minor-cleanup-unsent-requests


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/022bf129
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/022bf129
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/022bf129

Branch: refs/heads/trunk
Commit: 022bf129518e33e165f9ceefc4ab9e622952d3bd
Parents: f7354e7
Author: Jason Gustafson <ja...@confluent.io>
Authored: Wed Mar 8 23:16:53 2017 +0000
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Wed Mar 8 23:16:53 2017 +0000

----------------------------------------------------------------------
 .../kafka/clients/consumer/KafkaConsumer.java   |  2 +-
 .../internals/ConsumerNetworkClient.java        | 95 ++++++++++++--------
 .../clients/consumer/internals/Fetcher.java     |  2 +-
 3 files changed, 58 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/022bf129/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 51b00af..612f446 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1004,7 +1004,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                     //
                     // NOTE: since the consumed position has already been updated, we must not allow
                     // wakeups or any other errors to be triggered prior to returning the fetched records.
-                    if (fetcher.sendFetches() > 0 || client.hasPendingRequest())
+                    if (fetcher.sendFetches() > 0 || client.hasPendingRequests())
                         client.pollNoWakeup();
 
                     if (this.interceptors == null)

http://git-wip-us.apache.org/repos/asf/kafka/blob/022bf129/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index eb25359..478ed3f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -37,11 +37,11 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.kafka.common.errors.InterruptException;
 
@@ -242,6 +242,9 @@ public class ConsumerNetworkClient implements Closeable {
 
             // fail requests that couldn't be sent if they have expired
             failExpiredRequests(now);
+
+            // clean unsent requests collection to keep the map from growing indefinitely
+            unsent.clean();
         }
 
         // called without the lock to avoid deadlock potential if handlers need to acquire locks
@@ -271,12 +274,12 @@ public class ConsumerNetworkClient implements Closeable {
         long startMs = time.milliseconds();
         long remainingMs = timeoutMs;
 
-        while (hasPendingRequest(node) && remainingMs > 0) {
+        while (hasPendingRequests(node) && remainingMs > 0) {
             poll(remainingMs);
             remainingMs = timeoutMs - (time.milliseconds() - startMs);
         }
 
-        return !hasPendingRequest(node);
+        return !hasPendingRequests(node);
     }
 
     /**
@@ -297,8 +300,8 @@ public class ConsumerNetworkClient implements Closeable {
      * @param node The node in question
      * @return A boolean indicating whether there is pending request
      */
-    public boolean hasPendingRequest(Node node) {
-        if (unsent.hasRequest(node))
+    public boolean hasPendingRequests(Node node) {
+        if (unsent.hasRequests(node))
             return true;
         synchronized (this) {
             return client.inFlightRequestCount(node.idString()) > 0;
@@ -321,8 +324,8 @@ public class ConsumerNetworkClient implements Closeable {
      * have been transmitted (i.e. in-flight requests) and those which are awaiting transmission.
      * @return A boolean indicating whether there is pending request
      */
-    public boolean hasPendingRequest() {
-        if (unsent.hasRequest())
+    public boolean hasPendingRequests() {
+        if (unsent.hasRequests())
             return true;
         synchronized (this) {
             return client.inFlightRequestCount() > 0;
@@ -350,8 +353,7 @@ public class ConsumerNetworkClient implements Closeable {
         // by NetworkClient, so we just need to check whether connections for any of the unsent
         // requests have been disconnected; if they have, then we complete the corresponding future
         // and set the disconnect flag in the ClientResponse
-        List<Node> nodes = unsent.nodes();
-        for (Node node : nodes) {
+        for (Node node : unsent.nodes()) {
             if (client.connectionFailed(node)) {
                 // Remove entry before invoking request callback to avoid callbacks handling
                 // coordinator failures traversing the unsent list again.
@@ -368,7 +370,7 @@ public class ConsumerNetworkClient implements Closeable {
 
     private void failExpiredRequests(long now) {
         // clear all expired unsent requests and fail their corresponding futures
-        List<ClientRequest> expiredRequests = unsent.removeExpiredRequests(now, unsentExpiryMs);
+        Collection<ClientRequest> expiredRequests = unsent.removeExpiredRequests(now, unsentExpiryMs);
         for (ClientRequest request : expiredRequests) {
             RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler) request.callback();
             handler.onFailure(new TimeoutException("Failed to send request after " + unsentExpiryMs + " ms."));
@@ -471,12 +473,12 @@ public class ConsumerNetworkClient implements Closeable {
         }
     }
 
-    public class RequestFutureCompletionHandler implements RequestCompletionHandler {
+    private class RequestFutureCompletionHandler implements RequestCompletionHandler {
         private final RequestFuture<ClientResponse> future;
         private ClientResponse response;
         private RuntimeException e;
 
-        public RequestFutureCompletionHandler() {
+        private RequestFutureCompletionHandler() {
             this.future = new RequestFuture<>();
         }
 
@@ -525,55 +527,55 @@ public class ConsumerNetworkClient implements Closeable {
         boolean shouldBlock();
     }
 
-
     /*
      * A threadsafe helper class to hold requests per node that have not been sent yet
      */
     private final static class UnsentRequests {
-        private final Map<Node, ConcurrentLinkedQueue<ClientRequest>> unsent;
+        private final ConcurrentMap<Node, ConcurrentLinkedQueue<ClientRequest>> unsent;
 
-        public UnsentRequests() {
-            unsent = new HashMap<>();
+        private UnsentRequests() {
+            unsent = new ConcurrentHashMap<>();
         }
 
-        public synchronized void put(Node node, ClientRequest request) {
-            ConcurrentLinkedQueue<ClientRequest> nodeUnsent = unsent.get(node);
-            if (nodeUnsent == null) {
-                nodeUnsent = new ConcurrentLinkedQueue<>();
-                unsent.put(node, nodeUnsent);
+        public void put(Node node, ClientRequest request) {
+            // the lock protects the put from a concurrent removal of the queue for the node
+            synchronized (unsent) {
+                ConcurrentLinkedQueue<ClientRequest> requests = unsent.get(node);
+                if (requests == null) {
+                    requests = new ConcurrentLinkedQueue<>();
+                    unsent.put(node, requests);
+                }
+                requests.add(request);
             }
-            nodeUnsent.add(request);
         }
 
-        public synchronized int requestCount(Node node) {
+        public int requestCount(Node node) {
             ConcurrentLinkedQueue<ClientRequest> requests = unsent.get(node);
             return requests == null ? 0 : requests.size();
         }
 
-        public synchronized int requestCount() {
+        public int requestCount() {
             int total = 0;
             for (ConcurrentLinkedQueue<ClientRequest> requests : unsent.values())
                 total += requests.size();
             return total;
         }
 
-        public synchronized boolean hasRequest(Node node) {
+        public boolean hasRequests(Node node) {
             ConcurrentLinkedQueue<ClientRequest> requests = unsent.get(node);
             return requests != null && !requests.isEmpty();
         }
 
-        public synchronized boolean hasRequest() {
+        public boolean hasRequests() {
             for (ConcurrentLinkedQueue<ClientRequest> requests : unsent.values())
                 if (!requests.isEmpty())
                     return true;
             return false;
         }
 
-        public synchronized List<ClientRequest> removeExpiredRequests(long now, long unsentExpiryMs) {
+        public Collection<ClientRequest> removeExpiredRequests(long now, long unsentExpiryMs) {
             List<ClientRequest> expiredRequests = new ArrayList<>();
-            Iterator<ConcurrentLinkedQueue<ClientRequest>> iterator = unsent.values().iterator();
-            while (iterator.hasNext()) {
-                ConcurrentLinkedQueue<ClientRequest> requests = iterator.next();
+            for (ConcurrentLinkedQueue<ClientRequest> requests : unsent.values()) {
                 Iterator<ClientRequest> requestIterator = requests.iterator();
                 while (requestIterator.hasNext()) {
                     ClientRequest request = requestIterator.next();
@@ -583,24 +585,39 @@ public class ConsumerNetworkClient implements Closeable {
                     } else
                         break;
                 }
-                if (requests.isEmpty())
-                    iterator.remove();
             }
             return expiredRequests;
         }
 
-        public synchronized Collection<ClientRequest> remove(Node node) {
-            ConcurrentLinkedQueue<ClientRequest> requests = unsent.remove(node);
-            return requests == null ? Collections.<ClientRequest>emptyList() : requests;
+        public void clean() {
+            // the lock protects removal from a concurrent put which could otherwise mutate the
+            // queue after it has been removed from the map
+            synchronized (unsent) {
+                Iterator<ConcurrentLinkedQueue<ClientRequest>> iterator = unsent.values().iterator();
+                while (iterator.hasNext()) {
+                    ConcurrentLinkedQueue<ClientRequest> requests = iterator.next();
+                    if (requests.isEmpty())
+                        iterator.remove();
+                }
+            }
+        }
+
+        public Collection<ClientRequest> remove(Node node) {
+            // the lock protects removal from a concurrent put which could otherwise mutate the
+            // queue after it has been removed from the map
+            synchronized (unsent) {
+                ConcurrentLinkedQueue<ClientRequest> requests = unsent.remove(node);
+                return requests == null ? Collections.<ClientRequest>emptyList() : requests;
+            }
         }
 
-        public synchronized Iterator<ClientRequest> requestIterator(Node node) {
+        public Iterator<ClientRequest> requestIterator(Node node) {
             ConcurrentLinkedQueue<ClientRequest> requests = unsent.get(node);
             return requests == null ? Collections.<ClientRequest>emptyIterator() : requests.iterator();
         }
 
-        public synchronized List<Node> nodes() {
-            return new ArrayList<>(unsent.keySet());
+        public Collection<Node> nodes() {
+            return unsent.keySet();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/022bf129/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 536e4e8..441206a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -713,7 +713,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener {
             Node node = cluster.leaderFor(partition);
             if (node == null) {
                 metadata.requestUpdate();
-            } else if (!this.client.hasPendingRequest(node)) {
+            } else if (!this.client.hasPendingRequests(node)) {
                 // if there is a leader and no in-flight requests, issue a new fetch
                 LinkedHashMap<TopicPartition, FetchRequest.PartitionData> fetch = fetchable.get(node);
                 if (fetch == null) {