You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/05/11 00:31:16 UTC
[kafka] branch trunk updated: MINOR: A few small cleanups in
AdminClient from KAFKA-6299 (#4989)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a5ea6d1 MINOR: A few small cleanups in AdminClient from KAFKA-6299 (#4989)
a5ea6d1 is described below
commit a5ea6d10a8152df8f1dcb3d7c3fe3635325d82e4
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Thu May 10 17:31:12 2018 -0700
MINOR: A few small cleanups in AdminClient from KAFKA-6299 (#4989)
Reviewers: Ismael Juma <is...@juma.me.uk>
---
.../kafka/clients/admin/KafkaAdminClient.java | 132 ++++++++-------------
.../AdminMetadataManager.java | 2 +-
2 files changed, 53 insertions(+), 81 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 70e9fbd..c9e0e18 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -26,7 +26,7 @@ import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResult;
import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResults;
import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo;
-import org.apache.kafka.clients.admin.internal.AdminMetadataManager;
+import org.apache.kafka.clients.admin.internals.AdminMetadataManager;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
@@ -738,6 +738,30 @@ public class KafkaAdminClient extends AdminClient {
private final class AdminClientRunnable implements Runnable {
/**
+ * Calls which have not yet been assigned to a node.
+ * Only accessed from this thread.
+ */
+ private final ArrayList<Call> pendingCalls = new ArrayList<>();
+
+ /**
+ * Maps nodes to calls that we want to send.
+ * Only accessed from this thread.
+ */
+ private final Map<Node, List<Call>> callsToSend = new HashMap<>();
+
+ /**
+ * Maps node ID strings to calls that have been sent.
+ * Only accessed from this thread.
+ */
+ private final Map<String, List<Call>> callsInFlight = new HashMap<>();
+
+ /**
+ * Maps correlation IDs to calls that have been sent.
+ * Only accessed from this thread.
+ */
+ private final Map<Integer, Call> correlationIdToCalls = new HashMap<>();
+
+ /**
* Pending calls. Protected by the object monitor.
* This will be null only if the thread has shut down.
*/
@@ -748,9 +772,8 @@ public class KafkaAdminClient extends AdminClient {
*
* @param processor The timeout processor.
*/
- private void timeoutPendingCalls(TimeoutProcessor processor, List<Call> pendingCalls) {
- int numTimedOut = processor.handleTimeouts(pendingCalls,
- "Timed out waiting for a node assignment.");
+ private void timeoutPendingCalls(TimeoutProcessor processor) {
+ int numTimedOut = processor.handleTimeouts(pendingCalls, "Timed out waiting for a node assignment.");
if (numTimedOut > 0)
log.debug("Timed out {} pending calls.", numTimedOut);
}
@@ -759,9 +782,8 @@ public class KafkaAdminClient extends AdminClient {
* Time out calls which have been assigned to nodes.
*
* @param processor The timeout processor.
- * @param callsToSend A map of nodes to the calls they need to handle.
*/
- private int timeoutCallsToSend(TimeoutProcessor processor, Map<Node, List<Call>> callsToSend) {
+ private int timeoutCallsToSend(TimeoutProcessor processor) {
int numTimedOut = 0;
for (List<Call> callList : callsToSend.values()) {
numTimedOut += processor.handleTimeouts(callList,
@@ -778,7 +800,7 @@ public class KafkaAdminClient extends AdminClient {
* This function holds the lock for the minimum amount of time, to avoid blocking
* users of AdminClient who will also take the lock to add new calls.
*/
- private synchronized void drainNewCalls(ArrayList<Call> pendingCalls) {
+ private synchronized void drainNewCalls() {
if (!newCalls.isEmpty()) {
pendingCalls.addAll(newCalls);
newCalls.clear();
@@ -790,11 +812,8 @@ public class KafkaAdminClient extends AdminClient {
*
* @param now The current time in milliseconds.
* @param pendingIter An iterator yielding pending calls.
- * @param callsToSend A map of nodes to the calls they need to handle.
- *
*/
- private void chooseNodesForPendingCalls(long now, Iterator<Call> pendingIter,
- Map<Node, List<Call>> callsToSend) {
+ private void chooseNodesForPendingCalls(long now, Iterator<Call> pendingIter) {
while (pendingIter.hasNext()) {
Call call = pendingIter.next();
Node node = null;
@@ -821,14 +840,9 @@ public class KafkaAdminClient extends AdminClient {
* Send the calls which are ready.
*
* @param now The current time in milliseconds.
- * @param callsToSend The calls to send, by node.
- * @param correlationIdToCalls A map of correlation IDs to calls.
- * @param callsInFlight A map of nodes to the calls they have in flight.
- *
* @return The minimum timeout we need for poll().
*/
- private long sendEligibleCalls(long now, Map<Node, List<Call>> callsToSend,
- Map<Integer, Call> correlationIdToCalls, Map<String, List<Call>> callsInFlight) {
+ private long sendEligibleCalls(long now) {
long pollTimeout = Long.MAX_VALUE;
for (Iterator<Map.Entry<Node, List<Call>>> iter = callsToSend.entrySet().iterator();
iter.hasNext(); ) {
@@ -872,9 +886,8 @@ public class KafkaAdminClient extends AdminClient {
* to time them out is to close the entire connection.
*
* @param processor The timeout processor.
- * @param callsInFlight A map of nodes to the calls they have in flight.
*/
- private void timeoutCallsInFlight(TimeoutProcessor processor, Map<String, List<Call>> callsInFlight) {
+ private void timeoutCallsInFlight(TimeoutProcessor processor) {
int numTimedOut = 0;
for (Map.Entry<String, List<Call>> entry : callsInFlight.entrySet()) {
List<Call> contexts = entry.getValue();
@@ -907,18 +920,12 @@ public class KafkaAdminClient extends AdminClient {
*
* @param now The current time in milliseconds.
* @param responses The latest responses from KafkaClient.
- * @param correlationIdToCall A map of correlation IDs to calls.
- * @param callsInFlight A map of nodes to the calls they have in flight.
- **/
- private void handleResponses(long now,
- List<ClientResponse> responses,
- Map<String, List<Call>> callsInFlight,
- Map<Integer, Call> correlationIdToCall) {
-
+ **/
+ private void handleResponses(long now, List<ClientResponse> responses) {
for (ClientResponse response : responses) {
int correlationId = response.requestHeader().correlationId();
- Call call = correlationIdToCall.get(correlationId);
+ Call call = correlationIdToCalls.get(correlationId);
if (call == null) {
// If the server returns information about a correlation ID we didn't use yet,
// an internal server error has occurred. Close the connection and log an error message.
@@ -930,7 +937,7 @@ public class KafkaAdminClient extends AdminClient {
}
// Stop tracking this call.
- correlationIdToCall.remove(correlationId);
+ correlationIdToCalls.remove(correlationId);
List<Call> calls = callsInFlight.get(response.destination());
if ((calls == null) || (!calls.remove(call))) {
log.error("Internal server error on {}: ignoring call {} in correlationIdToCall " +
@@ -978,8 +985,7 @@ public class KafkaAdminClient extends AdminClient {
/**
* Return true if there are currently active external calls.
*/
- private boolean hasActiveExternalCalls(List<Call> pendingCalls,
- Map<Node, List<Call>> callsToSend, Map<Integer, Call> correlationIdToCalls) {
+ private boolean hasActiveExternalCalls() {
if (hasActiveExternalCalls(pendingCalls)) {
return true;
}
@@ -988,15 +994,11 @@ public class KafkaAdminClient extends AdminClient {
return true;
}
}
- if (hasActiveExternalCalls(correlationIdToCalls.values())) {
- return true;
- }
- return false;
+ return hasActiveExternalCalls(correlationIdToCalls.values());
}
- private boolean threadShouldExit(long now, long curHardShutdownTimeMs, List<Call> pendingCalls,
- Map<Node, List<Call>> callsToSend, Map<Integer, Call> correlationIdToCalls) {
- if (!hasActiveExternalCalls(pendingCalls, callsToSend, correlationIdToCalls)) {
+ private boolean threadShouldExit(long now, long curHardShutdownTimeMs) {
+ if (!hasActiveExternalCalls()) {
log.trace("All work has been completed, and the I/O thread is now exiting.");
return true;
}
@@ -1010,48 +1012,22 @@ public class KafkaAdminClient extends AdminClient {
@Override
public void run() {
- /**
- * Calls which have not yet been assigned to a node.
- * Only accessed from this thread.
- */
- ArrayList<Call> pendingCalls = new ArrayList<>();
-
- /**
- * Maps nodes to calls that we want to send.
- * Only accessed from this thread.
- */
- Map<Node, List<Call>> callsToSend = new HashMap<>();
-
- /**
- * Maps node ID strings to calls that have been sent.
- * Only accessed from this thread.
- */
- Map<String, List<Call>> callsInFlight = new HashMap<>();
-
- /**
- * Maps correlation IDs to calls that have been sent.
- * Only accessed from this thread.
- */
- Map<Integer, Call> correlationIdToCalls = new HashMap<>();
-
long now = time.milliseconds();
log.trace("Thread starting");
while (true) {
// Copy newCalls into pendingCalls.
- drainNewCalls(pendingCalls);
+ drainNewCalls();
// Check if the AdminClient thread should shut down.
long curHardShutdownTimeMs = hardShutdownTimeMs.get();
- if ((curHardShutdownTimeMs != INVALID_SHUTDOWN_TIME) &&
- threadShouldExit(now, curHardShutdownTimeMs, pendingCalls,
- callsToSend, correlationIdToCalls))
+ if ((curHardShutdownTimeMs != INVALID_SHUTDOWN_TIME) && threadShouldExit(now, curHardShutdownTimeMs))
break;
// Handle timeouts.
TimeoutProcessor timeoutProcessor = timeoutProcessorFactory.create(now);
- timeoutPendingCalls(timeoutProcessor, pendingCalls);
- timeoutCallsToSend(timeoutProcessor, callsToSend);
- timeoutCallsInFlight(timeoutProcessor, callsInFlight);
+ timeoutPendingCalls(timeoutProcessor);
+ timeoutCallsToSend(timeoutProcessor);
+ timeoutCallsInFlight(timeoutProcessor);
long pollTimeout = Math.min(1200000, timeoutProcessor.nextTimeoutMs());
if (curHardShutdownTimeMs != INVALID_SHUTDOWN_TIME) {
@@ -1059,7 +1035,7 @@ public class KafkaAdminClient extends AdminClient {
}
// Choose nodes for our pending calls.
- chooseNodesForPendingCalls(now, pendingCalls.iterator(), callsToSend);
+ chooseNodesForPendingCalls(now, pendingCalls.iterator());
long metadataFetchDelayMs = metadataManager.metadataFetchDelayMs(now);
if (metadataFetchDelayMs == 0) {
metadataManager.transitionToUpdatePending(now);
@@ -1067,11 +1043,9 @@ public class KafkaAdminClient extends AdminClient {
// Create a new metadata fetch call and add it to the end of pendingCalls.
// Assign a node for just the new call (we handled the other pending nodes above).
pendingCalls.add(metadataCall);
- chooseNodesForPendingCalls(now, pendingCalls.listIterator(pendingCalls.size() - 1),
- callsToSend);
+ chooseNodesForPendingCalls(now, pendingCalls.listIterator(pendingCalls.size() - 1));
}
- pollTimeout = Math.min(pollTimeout,
- sendEligibleCalls(now, callsToSend, correlationIdToCalls, callsInFlight));
+ pollTimeout = Math.min(pollTimeout, sendEligibleCalls(now));
if (metadataFetchDelayMs > 0) {
pollTimeout = Math.min(pollTimeout, metadataFetchDelayMs);
}
@@ -1083,18 +1057,16 @@ public class KafkaAdminClient extends AdminClient {
// Update the current time and handle the latest responses.
now = time.milliseconds();
- handleResponses(now, responses, callsInFlight, correlationIdToCalls);
+ handleResponses(now, responses);
}
int numTimedOut = 0;
TimeoutProcessor timeoutProcessor = new TimeoutProcessor(Long.MAX_VALUE);
synchronized (this) {
- numTimedOut += timeoutProcessor.handleTimeouts(newCalls,
- "The AdminClient thread has exited.");
+ numTimedOut += timeoutProcessor.handleTimeouts(newCalls, "The AdminClient thread has exited.");
newCalls = null;
}
- numTimedOut += timeoutProcessor.handleTimeouts(pendingCalls,
- "The AdminClient thread has exited.");
- numTimedOut += timeoutCallsToSend(timeoutProcessor, callsToSend);
+ numTimedOut += timeoutProcessor.handleTimeouts(pendingCalls, "The AdminClient thread has exited.");
+ numTimedOut += timeoutCallsToSend(timeoutProcessor);
numTimedOut += timeoutProcessor.handleTimeouts(correlationIdToCalls.values(),
"The AdminClient thread has exited.");
if (numTimedOut > 0) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internal/AdminMetadataManager.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
similarity index 99%
rename from clients/src/main/java/org/apache/kafka/clients/admin/internal/AdminMetadataManager.java
rename to clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
index 63e7fc8..3806560 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/internal/AdminMetadataManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.kafka.clients.admin.internal;
+package org.apache.kafka.clients.admin.internals;
import org.apache.kafka.clients.MetadataUpdater;
import org.apache.kafka.common.Cluster;
--
To stop receiving notification emails like this one, please contact
jgus@apache.org.