You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/05/11 00:31:16 UTC

[kafka] branch trunk updated: MINOR: A few small cleanups in AdminClient from KAFKA-6299 (#4989)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a5ea6d1  MINOR: A few small cleanups in AdminClient from KAFKA-6299 (#4989)
a5ea6d1 is described below

commit a5ea6d10a8152df8f1dcb3d7c3fe3635325d82e4
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Thu May 10 17:31:12 2018 -0700

    MINOR: A few small cleanups in AdminClient from KAFKA-6299 (#4989)
    
    Reviewers: Ismael Juma <is...@juma.me.uk>
---
 .../kafka/clients/admin/KafkaAdminClient.java      | 132 ++++++++-------------
 .../AdminMetadataManager.java                      |   2 +-
 2 files changed, 53 insertions(+), 81 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 70e9fbd..c9e0e18 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -26,7 +26,7 @@ import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResult;
 import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResults;
 import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo;
-import org.apache.kafka.clients.admin.internal.AdminMetadataManager;
+import org.apache.kafka.clients.admin.internals.AdminMetadataManager;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
 import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
@@ -738,6 +738,30 @@ public class KafkaAdminClient extends AdminClient {
 
     private final class AdminClientRunnable implements Runnable {
         /**
+         * Calls which have not yet been assigned to a node.
+         * Only accessed from this thread.
+         */
+        private final ArrayList<Call> pendingCalls = new ArrayList<>();
+
+        /**
+         * Maps nodes to calls that we want to send.
+         * Only accessed from this thread.
+         */
+        private final Map<Node, List<Call>> callsToSend = new HashMap<>();
+
+        /**
+         * Maps node ID strings to calls that have been sent.
+         * Only accessed from this thread.
+         */
+        private final Map<String, List<Call>> callsInFlight = new HashMap<>();
+
+        /**
+         * Maps correlation IDs to calls that have been sent.
+         * Only accessed from this thread.
+         */
+        private final Map<Integer, Call> correlationIdToCalls = new HashMap<>();
+
+        /**
          * Pending calls. Protected by the object monitor.
          * This will be null only if the thread has shut down.
          */
@@ -748,9 +772,8 @@ public class KafkaAdminClient extends AdminClient {
          *
          * @param processor     The timeout processor.
          */
-        private void timeoutPendingCalls(TimeoutProcessor processor, List<Call> pendingCalls) {
-            int numTimedOut = processor.handleTimeouts(pendingCalls,
-                    "Timed out waiting for a node assignment.");
+        private void timeoutPendingCalls(TimeoutProcessor processor) {
+            int numTimedOut = processor.handleTimeouts(pendingCalls, "Timed out waiting for a node assignment.");
             if (numTimedOut > 0)
                 log.debug("Timed out {} pending calls.", numTimedOut);
         }
@@ -759,9 +782,8 @@ public class KafkaAdminClient extends AdminClient {
          * Time out calls which have been assigned to nodes.
          *
          * @param processor     The timeout processor.
-         * @param callsToSend   A map of nodes to the calls they need to handle.
          */
-        private int timeoutCallsToSend(TimeoutProcessor processor, Map<Node, List<Call>> callsToSend) {
+        private int timeoutCallsToSend(TimeoutProcessor processor) {
             int numTimedOut = 0;
             for (List<Call> callList : callsToSend.values()) {
                 numTimedOut += processor.handleTimeouts(callList,
@@ -778,7 +800,7 @@ public class KafkaAdminClient extends AdminClient {
          * This function holds the lock for the minimum amount of time, to avoid blocking
          * users of AdminClient who will also take the lock to add new calls.
          */
-        private synchronized void drainNewCalls(ArrayList<Call> pendingCalls) {
+        private synchronized void drainNewCalls() {
             if (!newCalls.isEmpty()) {
                 pendingCalls.addAll(newCalls);
                 newCalls.clear();
@@ -790,11 +812,8 @@ public class KafkaAdminClient extends AdminClient {
          *
          * @param now           The current time in milliseconds.
          * @param pendingIter   An iterator yielding pending calls.
-         * @param callsToSend   A map of nodes to the calls they need to handle.
-         *
          */
-        private void chooseNodesForPendingCalls(long now, Iterator<Call> pendingIter,
-                Map<Node, List<Call>> callsToSend) {
+        private void chooseNodesForPendingCalls(long now, Iterator<Call> pendingIter) {
             while (pendingIter.hasNext()) {
                 Call call = pendingIter.next();
                 Node node = null;
@@ -821,14 +840,9 @@ public class KafkaAdminClient extends AdminClient {
          * Send the calls which are ready.
          *
          * @param now                   The current time in milliseconds.
-         * @param callsToSend           The calls to send, by node.
-         * @param correlationIdToCalls  A map of correlation IDs to calls.
-         * @param callsInFlight         A map of nodes to the calls they have in flight.
-         *
          * @return                      The minimum timeout we need for poll().
          */
-        private long sendEligibleCalls(long now, Map<Node, List<Call>> callsToSend,
-                         Map<Integer, Call> correlationIdToCalls, Map<String, List<Call>> callsInFlight) {
+        private long sendEligibleCalls(long now) {
             long pollTimeout = Long.MAX_VALUE;
             for (Iterator<Map.Entry<Node, List<Call>>> iter = callsToSend.entrySet().iterator();
                      iter.hasNext(); ) {
@@ -872,9 +886,8 @@ public class KafkaAdminClient extends AdminClient {
          * to time them out is to close the entire connection.
          *
          * @param processor         The timeout processor.
-         * @param callsInFlight     A map of nodes to the calls they have in flight.
          */
-        private void timeoutCallsInFlight(TimeoutProcessor processor, Map<String, List<Call>> callsInFlight) {
+        private void timeoutCallsInFlight(TimeoutProcessor processor) {
             int numTimedOut = 0;
             for (Map.Entry<String, List<Call>> entry : callsInFlight.entrySet()) {
                 List<Call> contexts = entry.getValue();
@@ -907,18 +920,12 @@ public class KafkaAdminClient extends AdminClient {
          *
          * @param now                   The current time in milliseconds.
          * @param responses             The latest responses from KafkaClient.
-         * @param correlationIdToCall   A map of correlation IDs to calls.
-         * @param callsInFlight         A map of nodes to the calls they have in flight.
-        **/
-        private void handleResponses(long now,
-                                     List<ClientResponse> responses,
-                                     Map<String, List<Call>> callsInFlight,
-                                     Map<Integer, Call> correlationIdToCall) {
-
+         **/
+        private void handleResponses(long now, List<ClientResponse> responses) {
             for (ClientResponse response : responses) {
                 int correlationId = response.requestHeader().correlationId();
 
-                Call call = correlationIdToCall.get(correlationId);
+                Call call = correlationIdToCalls.get(correlationId);
                 if (call == null) {
                     // If the server returns information about a correlation ID we didn't use yet,
                     // an internal server error has occurred. Close the connection and log an error message.
@@ -930,7 +937,7 @@ public class KafkaAdminClient extends AdminClient {
                 }
 
                 // Stop tracking this call.
-                correlationIdToCall.remove(correlationId);
+                correlationIdToCalls.remove(correlationId);
                 List<Call> calls = callsInFlight.get(response.destination());
                 if ((calls == null) || (!calls.remove(call))) {
                     log.error("Internal server error on {}: ignoring call {} in correlationIdToCall " +
@@ -978,8 +985,7 @@ public class KafkaAdminClient extends AdminClient {
         /**
          * Return true if there are currently active external calls.
          */
-        private boolean hasActiveExternalCalls(List<Call> pendingCalls,
-                Map<Node, List<Call>> callsToSend, Map<Integer, Call> correlationIdToCalls) {
+        private boolean hasActiveExternalCalls() {
             if (hasActiveExternalCalls(pendingCalls)) {
                 return true;
             }
@@ -988,15 +994,11 @@ public class KafkaAdminClient extends AdminClient {
                     return true;
                 }
             }
-            if (hasActiveExternalCalls(correlationIdToCalls.values())) {
-                return true;
-            }
-            return false;
+            return hasActiveExternalCalls(correlationIdToCalls.values());
         }
 
-        private boolean threadShouldExit(long now, long curHardShutdownTimeMs, List<Call> pendingCalls,
-                Map<Node, List<Call>> callsToSend, Map<Integer, Call> correlationIdToCalls) {
-            if (!hasActiveExternalCalls(pendingCalls, callsToSend, correlationIdToCalls)) {
+        private boolean threadShouldExit(long now, long curHardShutdownTimeMs) {
+            if (!hasActiveExternalCalls()) {
                 log.trace("All work has been completed, and the I/O thread is now exiting.");
                 return true;
             }
@@ -1010,48 +1012,22 @@ public class KafkaAdminClient extends AdminClient {
 
         @Override
         public void run() {
-            /**
-             * Calls which have not yet been assigned to a node.
-             * Only accessed from this thread.
-             */
-            ArrayList<Call> pendingCalls = new ArrayList<>();
-
-            /**
-             * Maps nodes to calls that we want to send.
-             * Only accessed from this thread.
-             */
-            Map<Node, List<Call>> callsToSend = new HashMap<>();
-
-            /**
-             * Maps node ID strings to calls that have been sent.
-             * Only accessed from this thread.
-             */
-            Map<String, List<Call>> callsInFlight = new HashMap<>();
-
-            /**
-             * Maps correlation IDs to calls that have been sent.
-             * Only accessed from this thread.
-             */
-            Map<Integer, Call> correlationIdToCalls = new HashMap<>();
-
             long now = time.milliseconds();
             log.trace("Thread starting");
             while (true) {
                 // Copy newCalls into pendingCalls.
-                drainNewCalls(pendingCalls);
+                drainNewCalls();
 
                 // Check if the AdminClient thread should shut down.
                 long curHardShutdownTimeMs = hardShutdownTimeMs.get();
-                if ((curHardShutdownTimeMs != INVALID_SHUTDOWN_TIME) &&
-                        threadShouldExit(now, curHardShutdownTimeMs, pendingCalls,
-                            callsToSend, correlationIdToCalls))
+                if ((curHardShutdownTimeMs != INVALID_SHUTDOWN_TIME) && threadShouldExit(now, curHardShutdownTimeMs))
                     break;
 
                 // Handle timeouts.
                 TimeoutProcessor timeoutProcessor = timeoutProcessorFactory.create(now);
-                timeoutPendingCalls(timeoutProcessor, pendingCalls);
-                timeoutCallsToSend(timeoutProcessor, callsToSend);
-                timeoutCallsInFlight(timeoutProcessor, callsInFlight);
+                timeoutPendingCalls(timeoutProcessor);
+                timeoutCallsToSend(timeoutProcessor);
+                timeoutCallsInFlight(timeoutProcessor);
 
                 long pollTimeout = Math.min(1200000, timeoutProcessor.nextTimeoutMs());
                 if (curHardShutdownTimeMs != INVALID_SHUTDOWN_TIME) {
@@ -1059,7 +1035,7 @@ public class KafkaAdminClient extends AdminClient {
                 }
 
                 // Choose nodes for our pending calls.
-                chooseNodesForPendingCalls(now, pendingCalls.iterator(), callsToSend);
+                chooseNodesForPendingCalls(now, pendingCalls.iterator());
                 long metadataFetchDelayMs = metadataManager.metadataFetchDelayMs(now);
                 if (metadataFetchDelayMs == 0) {
                     metadataManager.transitionToUpdatePending(now);
@@ -1067,11 +1043,9 @@ public class KafkaAdminClient extends AdminClient {
                     // Create a new metadata fetch call and add it to the end of pendingCalls.
                     // Assign a node for just the new call (we handled the other pending nodes above).
                     pendingCalls.add(metadataCall);
-                    chooseNodesForPendingCalls(now, pendingCalls.listIterator(pendingCalls.size() - 1),
-                        callsToSend);
+                    chooseNodesForPendingCalls(now, pendingCalls.listIterator(pendingCalls.size() - 1));
                 }
-                pollTimeout = Math.min(pollTimeout,
-                    sendEligibleCalls(now, callsToSend, correlationIdToCalls, callsInFlight));
+                pollTimeout = Math.min(pollTimeout, sendEligibleCalls(now));
                 if (metadataFetchDelayMs > 0) {
                     pollTimeout = Math.min(pollTimeout, metadataFetchDelayMs);
                 }
@@ -1083,18 +1057,16 @@ public class KafkaAdminClient extends AdminClient {
 
                 // Update the current time and handle the latest responses.
                 now = time.milliseconds();
-                handleResponses(now, responses, callsInFlight, correlationIdToCalls);
+                handleResponses(now, responses);
             }
             int numTimedOut = 0;
             TimeoutProcessor timeoutProcessor = new TimeoutProcessor(Long.MAX_VALUE);
             synchronized (this) {
-                numTimedOut += timeoutProcessor.handleTimeouts(newCalls,
-                        "The AdminClient thread has exited.");
+                numTimedOut += timeoutProcessor.handleTimeouts(newCalls, "The AdminClient thread has exited.");
                 newCalls = null;
             }
-            numTimedOut += timeoutProcessor.handleTimeouts(pendingCalls,
-                "The AdminClient thread has exited.");
-            numTimedOut += timeoutCallsToSend(timeoutProcessor, callsToSend);
+            numTimedOut += timeoutProcessor.handleTimeouts(pendingCalls, "The AdminClient thread has exited.");
+            numTimedOut += timeoutCallsToSend(timeoutProcessor);
             numTimedOut += timeoutProcessor.handleTimeouts(correlationIdToCalls.values(),
                     "The AdminClient thread has exited.");
             if (numTimedOut > 0) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internal/AdminMetadataManager.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
similarity index 99%
rename from clients/src/main/java/org/apache/kafka/clients/admin/internal/AdminMetadataManager.java
rename to clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
index 63e7fc8..3806560 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/internal/AdminMetadataManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.clients.admin.internal;
+package org.apache.kafka.clients.admin.internals;
 
 import org.apache.kafka.clients.MetadataUpdater;
 import org.apache.kafka.common.Cluster;

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.