You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bo...@apache.org on 2020/10/29 19:42:24 UTC

[kafka] branch 2.7 updated: Revert "KAFKA-9705 part 1: add KIP-590 request header fields (#9144)" (#9523) (#9529)

This is an automated email from the ASF dual-hosted git repository.

boyang pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.7 by this push:
     new 3023ac6  Revert "KAFKA-9705 part 1: add KIP-590 request header fields (#9144)" (#9523) (#9529)
3023ac6 is described below

commit 3023ac6cede6777afe2f419b4068147ab4a1e04e
Author: Boyang Chen <bo...@confluent.io>
AuthorDate: Thu Oct 29 12:41:09 2020 -0700

    Revert "KAFKA-9705 part 1: add KIP-590 request header fields (#9144)" (#9523) (#9529)
    
    This reverts commit 21dc5231ce9c7398c7ede4dbefa2f2202e06b2d4 as we decide to use Envelope for redirection instead of initial principal.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../org/apache/kafka/clients/ClientRequest.java    | 20 ++---
 .../java/org/apache/kafka/clients/KafkaClient.java |  6 +-
 .../org/apache/kafka/clients/NetworkClient.java    |  6 +-
 .../kafka/clients/admin/KafkaAdminClient.java      |  2 +-
 .../consumer/internals/ConsumerNetworkClient.java  |  8 +-
 .../kafka/clients/producer/internals/Sender.java   |  6 +-
 .../kafka/common/requests/RequestContext.java      | 14 +---
 .../kafka/common/requests/RequestHeader.java       | 31 ++------
 .../authenticator/SaslServerAuthenticator.java     |  9 ++-
 .../authorizer/AuthorizableRequestContext.java     |  9 ---
 .../resources/common/message/RequestHeader.json    |  8 +-
 .../java/org/apache/kafka/clients/MockClient.java  |  6 +-
 .../apache/kafka/clients/NetworkClientTest.java    | 28 ++-----
 .../clients/producer/internals/SenderTest.java     |  4 +-
 .../kafka/common/requests/RequestContextTest.java  | 12 +--
 .../kafka/common/requests/RequestHeaderTest.java   | 13 ----
 .../scala/kafka/common/InterBrokerSendThread.scala |  2 -
 .../main/scala/kafka/network/SocketServer.scala    | 28 ++-----
 .../scala/kafka/raft/KafkaNetworkChannel.scala     |  2 +-
 .../main/scala/kafka/tools/TestRaftServer.scala    |  5 +-
 .../kafka/common/InterBrokerSendThreadTest.scala   | 12 +--
 .../unit/kafka/network/RequestChannelTest.scala    |  3 +-
 .../unit/kafka/network/SocketServerTest.scala      | 91 ++--------------------
 .../security/authorizer/AclAuthorizerTest.scala    |  2 +-
 .../kafka/server/BaseClientQuotaManagerTest.scala  |  4 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala    |  6 +-
 .../server/ThrottledChannelExpirationTest.scala    |  2 +-
 .../kafka/jmh/acl/AclAuthorizerBenchmark.java      |  2 +-
 .../jmh/metadata/MetadataRequestBenchmark.java     |  3 +-
 29 files changed, 73 insertions(+), 271 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
index 37b6397..627615c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
@@ -34,8 +34,6 @@ public final class ClientRequest {
     private final boolean expectResponse;
     private final int requestTimeoutMs;
     private final RequestCompletionHandler callback;
-    private final String initialPrincipalName;
-    private final String initialClientId;
 
     /**
      * @param destination The brokerId to send the request to
@@ -44,8 +42,6 @@ public final class ClientRequest {
      * @param clientId The client ID to use for the header
      * @param createdTimeMs The unix timestamp in milliseconds for the time at which this request was created.
      * @param expectResponse Should we expect a response message or is this request complete once it is sent?
-     * @param initialPrincipalName The initial principal name if this is a redirect request, or null if this was not redirected
-     * @param initialClientId The initial client id if this is a redirect request, or null if this was not redirected
      * @param callback A callback to execute when the response has been received (or null if no callback is necessary)
      */
     public ClientRequest(String destination,
@@ -55,8 +51,6 @@ public final class ClientRequest {
                          long createdTimeMs,
                          boolean expectResponse,
                          int requestTimeoutMs,
-                         String initialPrincipalName,
-                         String initialClientId,
                          RequestCompletionHandler callback) {
         this.destination = destination;
         this.requestBuilder = requestBuilder;
@@ -66,8 +60,6 @@ public final class ClientRequest {
         this.expectResponse = expectResponse;
         this.requestTimeoutMs = requestTimeoutMs;
         this.callback = callback;
-        this.initialPrincipalName = initialPrincipalName;
-        this.initialClientId = initialClientId;
     }
 
     @Override
@@ -93,13 +85,11 @@ public final class ClientRequest {
     public RequestHeader makeHeader(short version) {
         short requestApiKey = requestBuilder.apiKey().id;
         return new RequestHeader(
-            new RequestHeaderData()
-                .setRequestApiKey(requestApiKey)
-                .setRequestApiVersion(version)
-                .setClientId(clientId)
-                .setCorrelationId(correlationId)
-                .setInitialPrincipalName(initialPrincipalName)
-                .setInitialClientId(initialClientId),
+            new RequestHeaderData().
+                setRequestApiKey(requestApiKey).
+                setRequestApiVersion(version).
+                setClientId(clientId).
+                setCorrelationId(correlationId),
             ApiKeys.forId(requestApiKey).requestHeaderVersion(version));
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
index b539025..18a7eef 100644
--- a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
@@ -189,18 +189,16 @@ public interface KafkaClient extends Closeable {
      *                         cancelling the request. The request may get cancelled sooner if the socket disconnects
      *                         for any reason including if another pending request to the same node timed out first.
      * @param callback the callback to invoke when we get a response
-     * @param initialPrincipalName the initial client principal name, when building a forward request
-     * @param initialClientId the initial client id, when building a forward request
      */
     ClientRequest newClientRequest(String nodeId,
                                    AbstractRequest.Builder<?> requestBuilder,
                                    long createdTimeMs,
                                    boolean expectResponse,
                                    int requestTimeoutMs,
-                                   String initialPrincipalName,
-                                   String initialClientId,
                                    RequestCompletionHandler callback);
 
+
+
     /**
      * Initiates shutdown of this client. This method may be invoked from another thread while this
      * client is being polled. No further requests may be sent using the client. The current poll()
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index cf6ef87..5287124 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -1199,7 +1199,7 @@ public class NetworkClient implements KafkaClient {
                                           AbstractRequest.Builder<?> requestBuilder,
                                           long createdTimeMs,
                                           boolean expectResponse) {
-        return newClientRequest(nodeId, requestBuilder, createdTimeMs, expectResponse, defaultRequestTimeoutMs, null, null, null);
+        return newClientRequest(nodeId, requestBuilder, createdTimeMs, expectResponse, defaultRequestTimeoutMs, null);
     }
 
     // visible for testing
@@ -1217,11 +1217,9 @@ public class NetworkClient implements KafkaClient {
                                           long createdTimeMs,
                                           boolean expectResponse,
                                           int requestTimeoutMs,
-                                          String initialPrincipalName,
-                                          String initialClientId,
                                           RequestCompletionHandler callback) {
         return new ClientRequest(nodeId, requestBuilder, nextCorrelationId(), clientId, createdTimeMs, expectResponse,
-                requestTimeoutMs, initialPrincipalName, initialClientId, callback);
+                requestTimeoutMs, callback);
     }
 
     public boolean discoverBrokerVersions() {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index dd9010f..3a21d38 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -1090,7 +1090,7 @@ public class KafkaAdminClient extends AdminClient {
                     continue;
                 }
                 ClientRequest clientRequest = client.newClientRequest(node.idString(), requestBuilder, now,
-                        true, requestTimeoutMs, null, null, null);
+                        true, requestTimeoutMs, null);
                 log.debug("Sending {} to {}. correlationId={}", requestBuilder, node, clientRequest.correlationId());
                 client.send(clientRequest, now);
                 getOrCreateListValue(callsInFlight, node.idString()).add(call);
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index 5b732dc..8926abe 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -127,7 +127,7 @@ public class ConsumerNetworkClient implements Closeable {
         long now = time.milliseconds();
         RequestFutureCompletionHandler completionHandler = new RequestFutureCompletionHandler();
         ClientRequest clientRequest = client.newClientRequest(node.idString(), requestBuilder, now, true,
-            requestTimeoutMs, null, null, completionHandler);
+                requestTimeoutMs, completionHandler);
         unsent.put(node, clientRequest);
 
         // wakeup the client in case it is blocking in poll so that we can send the queued request
@@ -432,8 +432,8 @@ public class ConsumerNetworkClient implements Closeable {
                     RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler) request.callback();
                     AuthenticationException authenticationException = client.authenticationException(node);
                     handler.onComplete(new ClientResponse(request.makeHeader(request.requestBuilder().latestAllowedVersion()),
-                        request.callback(), request.destination(), request.createdTimeMs(), now, true,
-                        null, authenticationException, null));
+                            request.callback(), request.destination(), request.createdTimeMs(), now, true,
+                            null, authenticationException, null));
                 }
             }
         }
@@ -594,7 +594,7 @@ public class ConsumerNetworkClient implements Closeable {
                 future.raise(response.authenticationException());
             } else if (response.wasDisconnected()) {
                 log.debug("Cancelled request with header {} due to node {} being disconnected",
-                    response.requestHeader(), response.destination());
+                        response.requestHeader(), response.destination());
                 future.raise(DisconnectException.INSTANCE);
             } else if (response.versionMismatch() != null) {
                 future.raise(response.versionMismatch());
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 19b2cb1..abcc657 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -469,8 +469,8 @@ public class Sender implements Runnable {
                 time.sleep(nextRequestHandler.retryBackoffMs());
 
             long currentTimeMs = time.milliseconds();
-            ClientRequest clientRequest = client.newClientRequest(targetNode.idString(), requestBuilder, currentTimeMs, true,
-                    requestTimeoutMs,  null, null, nextRequestHandler);
+            ClientRequest clientRequest = client.newClientRequest(
+                targetNode.idString(), requestBuilder, currentTimeMs, true, requestTimeoutMs, nextRequestHandler);
             log.debug("Sending transactional request {} to node {} with correlation ID {}", requestBuilder, targetNode, clientRequest.correlationId());
             client.send(clientRequest, currentTimeMs);
             transactionManager.setInFlightCorrelationId(clientRequest.correlationId());
@@ -758,7 +758,7 @@ public class Sender implements Runnable {
 
         String nodeId = Integer.toString(destination);
         ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0,
-                requestTimeoutMs, null, null, callback);
+                requestTimeoutMs, callback);
         client.send(clientRequest, now);
         log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
     }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RequestContext.java b/clients/src/main/java/org/apache/kafka/common/requests/RequestContext.java
index f834135..71fbc33 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/RequestContext.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestContext.java
@@ -40,7 +40,6 @@ public class RequestContext implements AuthorizableRequestContext {
     public final ListenerName listenerName;
     public final SecurityProtocol securityProtocol;
     public final ClientInformation clientInformation;
-    public final boolean fromPrivilegedListener;
 
     public RequestContext(RequestHeader header,
                           String connectionId,
@@ -48,8 +47,7 @@ public class RequestContext implements AuthorizableRequestContext {
                           KafkaPrincipal principal,
                           ListenerName listenerName,
                           SecurityProtocol securityProtocol,
-                          ClientInformation clientInformation,
-                          boolean fromPrivilegedListener) {
+                          ClientInformation clientInformation) {
         this.header = header;
         this.connectionId = connectionId;
         this.clientAddress = clientAddress;
@@ -57,7 +55,6 @@ public class RequestContext implements AuthorizableRequestContext {
         this.listenerName = listenerName;
         this.securityProtocol = securityProtocol;
         this.clientInformation = clientInformation;
-        this.fromPrivilegedListener = fromPrivilegedListener;
     }
 
     public RequestAndSize parseRequest(ByteBuffer buffer) {
@@ -77,9 +74,7 @@ public class RequestContext implements AuthorizableRequestContext {
                         ", apiVersion: " + header.apiVersion() +
                         ", connectionId: " + connectionId +
                         ", listenerName: " + listenerName +
-                        ", principal: " + principal +
-                        ", initialPrincipal: " + initialPrincipalName() +
-                        ", initialClientId: " + header.initialClientId(), ex);
+                        ", principal: " + principal, ex);
             }
         }
     }
@@ -139,9 +134,4 @@ public class RequestContext implements AuthorizableRequestContext {
     public int correlationId() {
         return header.correlationId();
     }
-
-    @Override
-    public String initialPrincipalName() {
-        return header.initialPrincipalName();
-    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java b/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
index a5d90d2..3d80c4e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
@@ -37,22 +37,11 @@ public class RequestHeader implements AbstractRequestResponse {
     }
 
     public RequestHeader(ApiKeys requestApiKey, short requestVersion, String clientId, int correlationId) {
-        this(requestApiKey, requestVersion, clientId, correlationId, null, null);
-    }
-
-    public RequestHeader(ApiKeys requestApiKey,
-                         short requestVersion,
-                         String clientId,
-                         int correlationId,
-                         String initialPrincipalName,
-                         String initialClientId) {
-        this(new RequestHeaderData()
-                 .setRequestApiKey(requestApiKey.id)
-                 .setRequestApiVersion(requestVersion)
-                 .setClientId(clientId)
-                 .setCorrelationId(correlationId)
-                 .setInitialPrincipalName(initialPrincipalName)
-                 .setInitialClientId(initialClientId),
+        this(new RequestHeaderData().
+                setRequestApiKey(requestApiKey.id).
+                setRequestApiVersion(requestVersion).
+                setClientId(clientId).
+                setCorrelationId(correlationId),
             ApiKeys.forId(requestApiKey.id).requestHeaderVersion(requestVersion));
     }
 
@@ -81,14 +70,6 @@ public class RequestHeader implements AbstractRequestResponse {
         return data.clientId();
     }
 
-    public String initialPrincipalName() {
-        return data.initialPrincipalName();
-    }
-
-    public String initialClientId() {
-        return data.initialClientId();
-    }
-
     public int correlationId() {
         return data.correlationId();
     }
@@ -125,8 +106,6 @@ public class RequestHeader implements AbstractRequestResponse {
                 ", apiVersion=" + apiVersion() +
                 ", clientId=" + clientId() +
                 ", correlationId=" + correlationId() +
-                ", initialPrincipalName=" + initialPrincipalName() +
-                ", initialClientId=" + initialClientId() +
                 ")";
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index 20dbf7b..ecd5d4f 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -237,12 +237,15 @@ public class SaslServerAuthenticator implements Authenticator {
         if (saslState != SaslState.REAUTH_PROCESS_HANDSHAKE) {
             if (netOutBuffer != null && !flushNetOutBufferAndUpdateInterestOps())
                 return;
+
             if (saslServer != null && saslServer.isComplete()) {
                 setSaslState(SaslState.COMPLETE);
                 return;
             }
+
             // allocate on heap (as opposed to any socket server memory pool)
             if (netInBuffer == null) netInBuffer = new NetworkReceive(MAX_RECEIVE_SIZE, connectionId);
+
             netInBuffer.readFrom(transportLayer);
             if (!netInBuffer.complete())
                 return;
@@ -407,7 +410,7 @@ public class SaslServerAuthenticator implements Authenticator {
             ApiKeys apiKey = header.apiKey();
             short version = header.apiVersion();
             RequestContext requestContext = new RequestContext(header, connectionId, clientAddress(),
-                    KafkaPrincipal.ANONYMOUS, listenerName, securityProtocol, ClientInformation.EMPTY, false);
+                    KafkaPrincipal.ANONYMOUS, listenerName, securityProtocol, ClientInformation.EMPTY);
             RequestAndSize requestAndSize = requestContext.parseRequest(requestBuffer);
             if (apiKey != ApiKeys.SASL_AUTHENTICATE) {
                 IllegalSaslStateException e = new IllegalSaslStateException("Unexpected Kafka request of type " + apiKey + " during SASL authentication.");
@@ -493,7 +496,7 @@ public class SaslServerAuthenticator implements Authenticator {
 
 
             RequestContext requestContext = new RequestContext(header, connectionId, clientAddress(),
-                    KafkaPrincipal.ANONYMOUS, listenerName, securityProtocol, ClientInformation.EMPTY, false);
+                    KafkaPrincipal.ANONYMOUS, listenerName, securityProtocol, ClientInformation.EMPTY);
             RequestAndSize requestAndSize = requestContext.parseRequest(requestBuffer);
             if (apiKey == ApiKeys.API_VERSIONS)
                 handleApiVersionsRequest(requestContext, (ApiVersionsRequest) requestAndSize.request);
@@ -708,6 +711,6 @@ public class SaslServerAuthenticator implements Authenticator {
 
         private long zeroIfNegative(long value) {
             return Math.max(0L, value);
-        }
+        }        
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/server/authorizer/AuthorizableRequestContext.java b/clients/src/main/java/org/apache/kafka/server/authorizer/AuthorizableRequestContext.java
index eb9b02f..f68b938 100644
--- a/clients/src/main/java/org/apache/kafka/server/authorizer/AuthorizableRequestContext.java
+++ b/clients/src/main/java/org/apache/kafka/server/authorizer/AuthorizableRequestContext.java
@@ -69,13 +69,4 @@ public interface AuthorizableRequestContext {
      * Returns the correlation id from the request header.
      */
     int correlationId();
-
-    /**
-     * Returns the initial principal name for a forwarded request, or null if the
-     * request is not forwarding. Note that this field should be used only for
-     * audit logging purpose, not for authorization.
-     */
-    default String initialPrincipalName() {
-        return null;
-    }
 }
diff --git a/clients/src/main/resources/common/message/RequestHeader.json b/clients/src/main/resources/common/message/RequestHeader.json
index 4285633..fbf4e2c 100644
--- a/clients/src/main/resources/common/message/RequestHeader.json
+++ b/clients/src/main/resources/common/message/RequestHeader.json
@@ -37,12 +37,6 @@
     // Since the client is sending the ApiVersionsRequest in order to discover what
     // versions are supported, the client does not know the best version to use.
     { "name": "ClientId", "type": "string", "versions": "1+", "nullableVersions": "1+", "ignorable": true,
-      "flexibleVersions": "none", "about": "The client ID string." },
-    { "name": "InitialPrincipalName", "type": "string", "tag": 0, "taggedVersions": "2+",
-      "nullableVersions": "2+", "default": "null", "ignorable": true,
-      "about": "Optional value of the initial principal name when the request is redirected by a broker, for audit logging and quota purpose." },
-    { "name": "InitialClientId", "type": "string", "tag": 1, "taggedVersions": "2+",
-      "nullableVersions": "2+", "default": "null", "ignorable": true,
-      "about": "Optional value of the initial client id when the request is redirected by a broker, for quota purpose." }
+      "flexibleVersions": "none", "about": "The client ID string." }
   ]
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 20f5843..214ecc5 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -507,7 +507,7 @@ public class MockClient implements KafkaClient {
     @Override
     public ClientRequest newClientRequest(String nodeId, AbstractRequest.Builder<?> requestBuilder, long createdTimeMs,
                                           boolean expectResponse) {
-        return newClientRequest(nodeId, requestBuilder, createdTimeMs, expectResponse, 5000, null, null, null);
+        return newClientRequest(nodeId, requestBuilder, createdTimeMs, expectResponse, 5000, null);
     }
 
     @Override
@@ -516,11 +516,9 @@ public class MockClient implements KafkaClient {
                                           long createdTimeMs,
                                           boolean expectResponse,
                                           int requestTimeoutMs,
-                                          String initialPrincipalName,
-                                          String initialClientId,
                                           RequestCompletionHandler callback) {
         return new ClientRequest(nodeId, requestBuilder, correlation++, "mockClientId", createdTimeMs,
-                expectResponse, requestTimeoutMs, initialPrincipalName, initialClientId, callback);
+                expectResponse, requestTimeoutMs, callback);
     }
 
     @Override
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index a8a1efc..896c237 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -152,20 +152,6 @@ public class NetworkClientTest {
     }
 
     @Test
-    public void testIncludeInitialPrincipalNameAndClientIdInHeader() {
-        MetadataRequest.Builder builder = new MetadataRequest.Builder(Collections.singletonList("test"), true);
-        final String initialPrincipalName = "initial-principal";
-        final String initialClientId = "initial-client";
-
-        ClientRequest request = client.newClientRequest("5", builder, 0L, false,
-            defaultRequestTimeoutMs, initialPrincipalName, initialClientId, null);
-        RequestHeader header = request.makeHeader(builder.latestAllowedVersion());
-
-        assertEquals(initialPrincipalName, header.initialPrincipalName());
-        assertEquals(initialClientId, header.initialClientId());
-    }
-
-    @Test
     public void testClose() {
         client.ready(node, time.milliseconds());
         awaitReady(client, node);
@@ -208,8 +194,8 @@ public class NetworkClientTest {
                 Collections.emptyMap(),
                 null);
         TestCallbackHandler handler = new TestCallbackHandler();
-        ClientRequest request = networkClient.newClientRequest(node.idString(), builder, time.milliseconds(), true,
-                defaultRequestTimeoutMs, null, null, handler);
+        ClientRequest request = networkClient.newClientRequest(
+                node.idString(), builder, time.milliseconds(), true, defaultRequestTimeoutMs, handler);
         networkClient.send(request, time.milliseconds());
         networkClient.poll(1, time.milliseconds());
         assertEquals(1, networkClient.inFlightRequestCount());
@@ -454,7 +440,7 @@ public class NetworkClientTest {
         TestCallbackHandler handler = new TestCallbackHandler();
         int requestTimeoutMs = defaultRequestTimeoutMs + 5000;
         ClientRequest request = client.newClientRequest(node.idString(), builder, time.milliseconds(), true,
-                requestTimeoutMs,  null, null, handler);
+                requestTimeoutMs, handler);
         assertEquals(requestTimeoutMs, request.requestTimeoutMs());
         testRequestTimeout(request);
     }
@@ -522,7 +508,7 @@ public class NetworkClientTest {
             null);
         TestCallbackHandler handler = new TestCallbackHandler();
         ClientRequest request = client.newClientRequest(node.idString(), builder, time.milliseconds(), true,
-                defaultRequestTimeoutMs,  null, null, handler);
+                defaultRequestTimeoutMs, handler);
         client.send(request, time.milliseconds());
         client.poll(1, time.milliseconds());
         ResponseHeader respHeader =
@@ -608,7 +594,7 @@ public class NetworkClientTest {
                 Collections.emptyMap());
         TestCallbackHandler handler = new TestCallbackHandler();
         ClientRequest request = client.newClientRequest(nodeId, builder, time.milliseconds(), true,
-                defaultRequestTimeoutMs, null, null, handler);
+                defaultRequestTimeoutMs, handler);
         client.send(request, time.milliseconds());
         return request.correlationId();
     }
@@ -889,11 +875,11 @@ public class NetworkClientTest {
             }
         };
 
-        ClientRequest request1 = client.newClientRequest(node.idString(), builder, now, true, defaultRequestTimeoutMs, null, null, callback);
+        ClientRequest request1 = client.newClientRequest(node.idString(), builder, now, true, defaultRequestTimeoutMs, callback);
         client.send(request1, now);
         client.poll(0, now);
 
-        ClientRequest request2 = client.newClientRequest(node.idString(), builder, now, true, defaultRequestTimeoutMs, null, null, callback);
+        ClientRequest request2 = client.newClientRequest(node.idString(), builder, now, true, defaultRequestTimeoutMs, callback);
         client.send(request2, now);
         client.poll(0, now);
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 771f699..10deb42 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -2299,7 +2299,7 @@ public class SenderTest {
         InOrder inOrder = inOrder(client);
         inOrder.verify(client, atLeastOnce()).ready(any(), anyLong());
         inOrder.verify(client, atLeastOnce()).newClientRequest(anyString(), any(), anyLong(), anyBoolean(), anyInt(),
-                any(), any(), any());
+                any());
         inOrder.verify(client, atLeastOnce()).send(any(), anyLong());
         inOrder.verify(client).poll(eq(0L), anyLong());
         inOrder.verify(client).poll(eq(accumulator.getDeliveryTimeoutMs()), anyLong());
@@ -2664,7 +2664,7 @@ public class SenderTest {
     private TransactionManager createTransactionManager() {
         return new TransactionManager(new LogContext(), null, 0, 100L, new ApiVersions(), false);
     }
-
+    
     private void setupWithTransactionState(TransactionManager transactionManager) {
         setupWithTransactionState(transactionManager, false, null, true);
     }
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java
index 7e0901f..04d7726 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java
@@ -29,7 +29,6 @@ import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.junit.Test;
 
 import java.net.InetAddress;
-import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 
 import static org.junit.Assert.assertEquals;
@@ -43,7 +42,7 @@ public class RequestContextTest {
 
         RequestHeader header = new RequestHeader(ApiKeys.API_VERSIONS, Short.MAX_VALUE, "", correlationId);
         RequestContext context = new RequestContext(header, "0", InetAddress.getLocalHost(), KafkaPrincipal.ANONYMOUS,
-                new ListenerName("ssl"), SecurityProtocol.SASL_SSL, ClientInformation.EMPTY, false);
+                new ListenerName("ssl"), SecurityProtocol.SASL_SSL, ClientInformation.EMPTY);
         assertEquals(0, context.apiVersion());
 
         // Write some garbage to the request buffer. This should be ignored since we will treat
@@ -80,13 +79,4 @@ public class RequestContextTest {
         assertTrue(response.data.apiKeys().isEmpty());
     }
 
-    @Test
-    public void testInitialPrincipalName() throws UnknownHostException {
-        final String initialPrincipalName = "initial-principal";
-        RequestHeader header = new RequestHeader(ApiKeys.API_VERSIONS, Short.MAX_VALUE, "", 1, initialPrincipalName, null);
-        RequestContext context = new RequestContext(header, "0", InetAddress.getLocalHost(), KafkaPrincipal.ANONYMOUS,
-            new ListenerName("ssl"), SecurityProtocol.SASL_SSL, ClientInformation.EMPTY, false);
-
-        assertEquals(initialPrincipalName, context.initialPrincipalName());
-    }
 }
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestHeaderTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestHeaderTest.java
index cc8a72d..2842ae8 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestHeaderTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestHeaderTest.java
@@ -73,17 +73,4 @@ public class RequestHeaderTest {
         RequestHeader deserialized = RequestHeader.parse(buffer);
         assertEquals(header, deserialized);
     }
-
-    @Test
-    public void testRequestHeaderWithInitialPrincipalAndClientId() {
-        final String initialPrincipalName = "initial-principal";
-        final String initialClientId = "initial-client";
-        RequestHeader header = new RequestHeader(ApiKeys.CREATE_DELEGATION_TOKEN, (short) 2, "", 10,
-            initialPrincipalName, initialClientId);
-        assertEquals(2, header.headerVersion());
-        ByteBuffer buffer = toBuffer(header.toStruct());
-        assertEquals(17 + initialPrincipalName.length() + initialClientId.length(), buffer.remaining());
-        RequestHeader deserialized = RequestHeader.parse(buffer);
-        assertEquals(header, deserialized);
-    }
 }
diff --git a/core/src/main/scala/kafka/common/InterBrokerSendThread.scala b/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
index a9048a0..11e1aa8 100644
--- a/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
+++ b/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
@@ -63,8 +63,6 @@ abstract class InterBrokerSendThread(name: String,
           now,
           true,
           requestTimeoutMs,
-          null,
-          null,
           completionHandler))
     }
 
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index dac2882..7b816c4 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -261,8 +261,7 @@ class SocketServer(val config: KafkaConfig,
     endpointOpt.foreach { endpoint =>
       connectionQuotas.addListener(config, endpoint.listenerName)
       val controlPlaneAcceptor = createAcceptor(endpoint, ControlPlaneMetricPrefix)
-      val controlPlaneProcessor = newProcessor(nextProcessorId, controlPlaneRequestChannelOpt.get,
-        connectionQuotas, endpoint.listenerName, endpoint.securityProtocol, memoryPool, isPrivilegedListener = true)
+      val controlPlaneProcessor = newProcessor(nextProcessorId, controlPlaneRequestChannelOpt.get, connectionQuotas, endpoint.listenerName, endpoint.securityProtocol, memoryPool)
       controlPlaneAcceptorOpt = Some(controlPlaneAcceptor)
       controlPlaneProcessorOpt = Some(controlPlaneProcessor)
       val listenerProcessors = new ArrayBuffer[Processor]()
@@ -285,11 +284,8 @@ class SocketServer(val config: KafkaConfig,
     val listenerName = endpoint.listenerName
     val securityProtocol = endpoint.securityProtocol
     val listenerProcessors = new ArrayBuffer[Processor]()
-    val isPrivilegedListener = controlPlaneRequestChannelOpt.isEmpty && config.interBrokerListenerName == listenerName
-
     for (_ <- 0 until newProcessorsPerListener) {
-      val processor = newProcessor(nextProcessorId, dataPlaneRequestChannel, connectionQuotas,
-        listenerName, securityProtocol, memoryPool, isPrivilegedListener)
+      val processor = newProcessor(nextProcessorId, dataPlaneRequestChannel, connectionQuotas, listenerName, securityProtocol, memoryPool)
       listenerProcessors += processor
       dataPlaneRequestChannel.addProcessor(processor)
       nextProcessorId += 1
@@ -415,7 +411,7 @@ class SocketServer(val config: KafkaConfig,
 
   // `protected` for test usage
   protected[network] def newProcessor(id: Int, requestChannel: RequestChannel, connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
-                                      securityProtocol: SecurityProtocol, memoryPool: MemoryPool, isPrivilegedListener: Boolean): Processor = {
+                                      securityProtocol: SecurityProtocol, memoryPool: MemoryPool): Processor = {
     new Processor(id,
       time,
       config.socketRequestMaxBytes,
@@ -430,7 +426,6 @@ class SocketServer(val config: KafkaConfig,
       credentialProvider,
       memoryPool,
       logContext,
-      isPrivilegedListener = isPrivilegedListener
     )
   }
 
@@ -655,8 +650,8 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
   }
 
   /**
-   * Create a server socket to listen for connections on.
-   */
+  * Create a server socket to listen for connections on.
+  */
   private def openServerSocket(host: String, port: Int): ServerSocketChannel = {
     val socketAddress =
       if (host == null || host.trim.isEmpty)
@@ -730,12 +725,6 @@ private[kafka] object Processor {
 /**
  * Thread that processes all requests from a single connection. There are N of these running in parallel
  * each of which has its own selector
- *
- * @param isPrivilegedListener The privileged listener flag is used as one factor to determine whether
- *                             a certain request is forwarded or not. When the control plane is defined,
- *                             the control plane processor would be fellow broker's choice for sending
- *                             forwarding requests; if the control plane is not defined, the processor
- *                             relying on the inter broker listener would be acting as the privileged listener.
  */
 private[kafka] class Processor(val id: Int,
                                time: Time,
@@ -751,8 +740,7 @@ private[kafka] class Processor(val id: Int,
                                credentialProvider: CredentialProvider,
                                memoryPool: MemoryPool,
                                logContext: LogContext,
-                               connectionQueueSize: Int = ConnectionQueueSize,
-                               isPrivilegedListener: Boolean = false) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
+                               connectionQueueSize: Int = ConnectionQueueSize) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
 
   private object ConnectionId {
     def fromString(s: String): Option[ConnectionId] = s.split("-") match {
@@ -781,7 +769,7 @@ private[kafka] class Processor(val id: Int,
   newGauge(IdlePercentMetricName, () => {
     Option(metrics.metric(metrics.metricName("io-wait-ratio", MetricsGroup, metricTags))).fold(0.0)(m =>
       Math.min(m.metricValue.asInstanceOf[Double], 1.0))
-  },
+    },
     // for compatibility, only add a networkProcessor tag to the Yammer Metrics alias (the equivalent Selector metric
     // also includes the listener name)
     Map(NetworkProcessorMetricTag -> id.toString)
@@ -969,7 +957,7 @@ private[kafka] class Processor(val id: Int,
                 val connectionId = receive.source
                 val context = new RequestContext(header, connectionId, channel.socketAddress,
                   channel.principal, listenerName, securityProtocol,
-                  channel.channelMetadataRegistry.clientInformation, isPrivilegedListener)
+                  channel.channelMetadataRegistry.clientInformation)
                 val req = new RequestChannel.Request(processor = id, context = context,
                   startTimeNanos = nowNanos, memoryPool, receive.payload, requestChannel.metrics)
                 // KIP-511: ApiVersionsRequest is intercepted here to catch the client software name
diff --git a/core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala b/core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala
index cac21a4..4637433 100644
--- a/core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala
+++ b/core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala
@@ -107,7 +107,7 @@ class KafkaNetworkChannel(time: Time,
     val correlationId = req.correlationId
     val createdTimeMs = req.createdTimeMs
     new ClientRequest(destination, request, correlationId, clientId, createdTimeMs, true,
-      requestTimeoutMs, null, null, null)
+      requestTimeoutMs, null)
   }
 
   override def send(message: RaftMessage): Unit = {
diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala b/core/src/main/scala/kafka/tools/TestRaftServer.scala
index 85444b4..a99bc6e 100644
--- a/core/src/main/scala/kafka/tools/TestRaftServer.scala
+++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala
@@ -317,7 +317,6 @@ class RaftSocketServer(
     listenerName: ListenerName,
     securityProtocol: SecurityProtocol,
     memoryPool: MemoryPool,
-    isPrivilegedListener: Boolean
   ): Processor = {
     new Processor(id,
       time,
@@ -332,9 +331,7 @@ class RaftSocketServer(
       metrics,
       credentialProvider,
       memoryPool,
-      logContext,
-      isPrivilegedListener = isPrivilegedListener
-    ) {
+      logContext) {
       // We extend this API to skip the check for only enabled APIs. This
       // gets us access to Vote, BeginQuorumEpoch, etc. which are not usable
       // from the Kafka broker yet.
diff --git a/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala b/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala
index 5f8a8ee..f5110bf 100644
--- a/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala
+++ b/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala
@@ -64,7 +64,7 @@ class InterBrokerSendThreadTest {
       override def generateRequests() = List[RequestAndCompletionHandler](handler)
     }
 
-    val clientRequest = new ClientRequest("dest", request, 0, "1", 0, true, requestTimeoutMs, null, null, handler.handler)
+    val clientRequest = new ClientRequest("dest", request, 0, "1", 0, true, requestTimeoutMs, handler.handler)
 
     EasyMock.expect(networkClient.newClientRequest(
       EasyMock.eq("1"),
@@ -72,8 +72,6 @@ class InterBrokerSendThreadTest {
       EasyMock.anyLong(),
       EasyMock.eq(true),
       EasyMock.eq(requestTimeoutMs),
-      EasyMock.eq(null),
-      EasyMock.eq(null),
       EasyMock.same(handler.handler)))
       .andReturn(clientRequest)
 
@@ -103,7 +101,7 @@ class InterBrokerSendThreadTest {
       override def generateRequests() = List[RequestAndCompletionHandler](requestAndCompletionHandler)
     }
 
-    val clientRequest = new ClientRequest("dest", request, 0, "1", 0, true, requestTimeoutMs, null, null, requestAndCompletionHandler.handler)
+    val clientRequest = new ClientRequest("dest", request, 0, "1", 0, true, requestTimeoutMs, requestAndCompletionHandler.handler)
 
     EasyMock.expect(networkClient.newClientRequest(
       EasyMock.eq("1"),
@@ -111,8 +109,6 @@ class InterBrokerSendThreadTest {
       EasyMock.anyLong(),
       EasyMock.eq(true),
       EasyMock.eq(requestTimeoutMs),
-      EasyMock.eq(null),
-      EasyMock.eq(null),
       EasyMock.same(requestAndCompletionHandler.handler)))
       .andReturn(clientRequest)
 
@@ -156,8 +152,6 @@ class InterBrokerSendThreadTest {
       time.milliseconds(),
       true,
       requestTimeoutMs,
-      null,
-      null,
       handler.handler)
     time.sleep(1500)
 
@@ -167,8 +161,6 @@ class InterBrokerSendThreadTest {
       EasyMock.eq(time.milliseconds()),
       EasyMock.eq(true),
       EasyMock.eq(requestTimeoutMs),
-      EasyMock.eq(null),
-      EasyMock.eq(null),
       EasyMock.same(handler.handler)))
       .andReturn(clientRequest)
 
diff --git a/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala b/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
index d9740ba..ce19491 100644
--- a/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
+++ b/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
@@ -182,8 +182,7 @@ class RequestChannelTest {
       new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user"),
       ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
       SecurityProtocol.PLAINTEXT,
-      new ClientInformation("name", "version"),
-      false)
+      new ClientInformation("name", "version"))
   }
 
   private def toMap(config: Config): Map[String, String] = {
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 8190fb0..da18e9e 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -88,8 +88,6 @@ class SocketServerTest {
     // Run the tests with TRACE logging to exercise request logging path
     logLevelToRestore = kafkaLogger.getLevel
     kafkaLogger.setLevel(Level.TRACE)
-
-    assertTrue(server.controlPlaneRequestChannelOpt.isEmpty)
   }
 
   @After
@@ -304,7 +302,6 @@ class SocketServerTest {
     val config = KafkaConfig.fromProps(testProps)
     val testableServer = new TestableSocketServer(config)
     testableServer.startup(startProcessingRequests = false)
-
     val updatedEndPoints = config.advertisedListeners.map { endpoint =>
       endpoint.copy(port = testableServer.boundPort(endpoint.listenerName))
     }.map(_.toJava)
@@ -521,10 +518,10 @@ class SocketServerTest {
     val overrideConnectionId = "127.0.0.1:1-127.0.0.1:2-0"
     val overrideServer = new SocketServer(KafkaConfig.fromProps(props), serverMetrics, time, credentialProvider) {
       override def newProcessor(id: Int, requestChannel: RequestChannel, connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
-                                protocol: SecurityProtocol, memoryPool: MemoryPool, isPrivilegedListener: Boolean = false): Processor = {
+                                protocol: SecurityProtocol, memoryPool: MemoryPool): Processor = {
         new Processor(id, time, config.socketRequestMaxBytes, dataPlaneRequestChannel, connectionQuotas,
           config.connectionsMaxIdleMs, config.failedAuthenticationDelayMs, listenerName, protocol, config, metrics,
-          credentialProvider, memoryPool, new LogContext(), isPrivilegedListener = isPrivilegedListener) {
+          credentialProvider, memoryPool, new LogContext()) {
             override protected[network] def connectionId(socket: Socket): String = overrideConnectionId
             override protected[network] def createSelector(channelBuilder: ChannelBuilder): Selector = {
              val testableSelector = new TestableSelector(config, channelBuilder, time, metrics)
@@ -1021,10 +1018,10 @@ class SocketServerTest {
     var conn: Socket = null
     val overrideServer = new SocketServer(KafkaConfig.fromProps(props), serverMetrics, Time.SYSTEM, credentialProvider) {
       override def newProcessor(id: Int, requestChannel: RequestChannel, connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
-                                protocol: SecurityProtocol, memoryPool: MemoryPool, isPrivilegedListener: Boolean = false): Processor = {
+                                protocol: SecurityProtocol, memoryPool: MemoryPool): Processor = {
         new Processor(id, time, config.socketRequestMaxBytes, dataPlaneRequestChannel, connectionQuotas,
           config.connectionsMaxIdleMs, config.failedAuthenticationDelayMs, listenerName, protocol, config, metrics,
-          credentialProvider, MemoryPool.NONE, new LogContext(), isPrivilegedListener = isPrivilegedListener) {
+          credentialProvider, MemoryPool.NONE, new LogContext()) {
           override protected[network] def sendResponse(response: RequestChannel.Response, responseSend: Send): Unit = {
             conn.close()
             super.sendResponse(response, responseSend)
@@ -1061,10 +1058,10 @@ class SocketServerTest {
     @volatile var selector: TestableSelector = null
     val overrideServer = new SocketServer(KafkaConfig.fromProps(props), serverMetrics, Time.SYSTEM, credentialProvider) {
       override def newProcessor(id: Int, requestChannel: RequestChannel, connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
-                                protocol: SecurityProtocol, memoryPool: MemoryPool, isPrivilegedListener: Boolean = false): Processor = {
+                                protocol: SecurityProtocol, memoryPool: MemoryPool): Processor = {
         new Processor(id, time, config.socketRequestMaxBytes, dataPlaneRequestChannel, connectionQuotas,
           config.connectionsMaxIdleMs, config.failedAuthenticationDelayMs, listenerName, protocol, config, metrics,
-          credentialProvider, memoryPool, new LogContext(), isPrivilegedListener = isPrivilegedListener) {
+          credentialProvider, memoryPool, new LogContext()) {
           override protected[network] def createSelector(channelBuilder: ChannelBuilder): Selector = {
            val testableSelector = new TestableSelector(config, channelBuilder, time, metrics)
            selector = testableSelector
@@ -1476,9 +1473,6 @@ class SocketServerTest {
     props ++= sslServerProps
     val testableServer = new TestableSocketServer(time = time)
     testableServer.startup()
-
-    assertTrue(testableServer.controlPlaneRequestChannelOpt.isEmpty)
-
     val proxyServer = new ProxyServer(testableServer)
     try {
       val testableSelector = testableServer.testableSelector
@@ -1676,75 +1670,6 @@ class SocketServerTest {
     }
   }
 
-
-  @Test
-  def testControlPlaneAsPrivilegedListener(): Unit = {
-    val testProps = new Properties
-    testProps ++= props
-    testProps.put("listeners", "PLAINTEXT://localhost:0,CONTROLLER://localhost:0")
-    testProps.put("listener.security.protocol.map", "PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT")
-    testProps.put("control.plane.listener.name", "CONTROLLER")
-    val config = KafkaConfig.fromProps(testProps)
-    withTestableServer(config, { testableServer =>
-      val controlPlaneSocket = connect(testableServer, config.controlPlaneListenerName.get,
-        localAddr = InetAddress.getLocalHost)
-      val sentRequest = sendAndReceiveControllerRequest(controlPlaneSocket, testableServer)
-      assertTrue(sentRequest.context.fromPrivilegedListener)
-
-      val plainSocket = connect(testableServer, localAddr = InetAddress.getLocalHost)
-      val plainRequest = sendAndReceiveRequest(plainSocket, testableServer)
-      assertFalse(plainRequest.context.fromPrivilegedListener)
-    })
-  }
-
-  @Test
-  def testInterBrokerListenerAsPrivilegedListener(): Unit = {
-    val testProps = new Properties
-    testProps ++= props
-    testProps.put("listeners", "EXTERNAL://localhost:0,INTERNAL://localhost:0")
-    testProps.put("listener.security.protocol.map", "EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT")
-    testProps.put("inter.broker.listener.name", "INTERNAL")
-    val config = KafkaConfig.fromProps(testProps)
-    withTestableServer(config, { testableServer =>
-      val interBrokerSocket = connect(testableServer, config.interBrokerListenerName,
-        localAddr = InetAddress.getLocalHost)
-      val sentRequest = sendAndReceiveRequest(interBrokerSocket, testableServer)
-      assertTrue(sentRequest.context.fromPrivilegedListener)
-
-      val externalSocket = connect(testableServer, new ListenerName("EXTERNAL"),
-        localAddr = InetAddress.getLocalHost)
-      val externalRequest = sendAndReceiveRequest(externalSocket, testableServer)
-      assertFalse(externalRequest.context.fromPrivilegedListener)
-    })
-  }
-
-  @Test
-  def testControlPlaneTakePrecedenceOverInterBrokerListenerAsPrivilegedListener(): Unit = {
-    val testProps = new Properties
-    testProps ++= props
-    testProps.put("listeners", "EXTERNAL://localhost:0,INTERNAL://localhost:0,CONTROLLER://localhost:0")
-    testProps.put("listener.security.protocol.map", "EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT")
-    testProps.put("control.plane.listener.name", "CONTROLLER")
-    testProps.put("inter.broker.listener.name", "INTERNAL")
-    val config = KafkaConfig.fromProps(testProps)
-    withTestableServer(config, { testableServer =>
-      val controlPlaneSocket = connect(testableServer, config.controlPlaneListenerName.get,
-        localAddr = InetAddress.getLocalHost)
-      val controlPlaneRequest = sendAndReceiveControllerRequest(controlPlaneSocket, testableServer)
-      assertTrue(controlPlaneRequest.context.fromPrivilegedListener)
-
-      val interBrokerSocket = connect(testableServer, config.interBrokerListenerName,
-        localAddr = InetAddress.getLocalHost)
-      val interBrokerRequest = sendAndReceiveRequest(interBrokerSocket, testableServer)
-      assertFalse(interBrokerRequest.context.fromPrivilegedListener)
-
-      val externalSocket = connect(testableServer, new ListenerName("EXTERNAL"),
-        localAddr = InetAddress.getLocalHost)
-      val externalRequest = sendAndReceiveRequest(externalSocket, testableServer)
-      assertFalse(externalRequest.context.fromPrivilegedListener)
-    })
-  }
-
   private def sslServerProps: Properties = {
     val trustStoreFile = File.createTempFile("truststore", ".jks")
     val sslProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, interBrokerSecurityProtocol = Some(SecurityProtocol.SSL),
@@ -1820,10 +1745,10 @@ class SocketServerTest {
     @volatile var uncaughtExceptions = 0
 
     override def newProcessor(id: Int, requestChannel: RequestChannel, connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
-                              protocol: SecurityProtocol, memoryPool: MemoryPool, isPrivilegedListener: Boolean = false): Processor = {
+                                protocol: SecurityProtocol, memoryPool: MemoryPool): Processor = {
       new Processor(id, time, config.socketRequestMaxBytes, requestChannel, connectionQuotas, config.connectionsMaxIdleMs,
         config.failedAuthenticationDelayMs, listenerName, protocol, config, metrics, credentialProvider,
-        memoryPool, new LogContext(), connectionQueueSize, isPrivilegedListener) {
+        memoryPool, new LogContext(), connectionQueueSize) {
 
         override protected[network] def createSelector(channelBuilder: ChannelBuilder): Selector = {
            val testableSelector = new TestableSelector(config, channelBuilder, time, metrics, metricTags.asScala)
diff --git a/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala
index a5c57b6..163b1fb 100644
--- a/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala
@@ -1037,7 +1037,7 @@ class AclAuthorizerTest extends ZooKeeperTestHarness {
     val securityProtocol = SecurityProtocol.SASL_PLAINTEXT
     val header = new RequestHeader(apiKey, 2, "", 1) //ApiKeys apiKey, short version, String clientId, int correlation
     new RequestContext(header, "", clientAddress, principal, ListenerName.forSecurityProtocol(securityProtocol),
-      securityProtocol, ClientInformation.EMPTY, false)
+      securityProtocol, ClientInformation.EMPTY)
   }
 
   private def authorize(authorizer: AclAuthorizer, requestContext: RequestContext, operation: AclOperation, resource: ResourcePattern): Boolean = {
diff --git a/core/src/test/scala/unit/kafka/server/BaseClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/BaseClientQuotaManagerTest.scala
index 1851306..6d4b5be 100644
--- a/core/src/test/scala/unit/kafka/server/BaseClientQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BaseClientQuotaManagerTest.scala
@@ -60,7 +60,7 @@ class BaseClientQuotaManagerTest {
   }
 
   protected def buildRequest[T <: AbstractRequest](builder: AbstractRequest.Builder[T],
-                                                   listenerName: ListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)): (T, RequestChannel.Request) = {
+                                                 listenerName: ListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)): (T, RequestChannel.Request) = {
 
     val request = builder.build()
     val buffer = request.serialize(new RequestHeader(builder.apiKey, request.version, "", 0))
@@ -69,7 +69,7 @@ class BaseClientQuotaManagerTest {
     // read the header from the buffer first so that the body can be read next from the Request constructor
     val header = RequestHeader.parse(buffer)
     val context = new RequestContext(header, "1", InetAddress.getLocalHost, KafkaPrincipal.ANONYMOUS,
-      listenerName, SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false)
+      listenerName, SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY)
     (request, new RequestChannel.Request(processor = 1, context = context, startTimeNanos =  0, MemoryPool.NONE, buffer,
       requestChannelMetrics))
   }
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 90be674..e4136ed 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -153,7 +153,7 @@ class KafkaApisTest {
       clientId, 0)
     val requestContext = new RequestContext(requestHeader, "1", InetAddress.getLocalHost,
       KafkaPrincipal.ANONYMOUS, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
-      SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false)
+      SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY)
 
     val expectedActions = Seq(
       new Action(operation, new ResourcePattern(resourceType, resourceName, PatternType.LITERAL),
@@ -187,7 +187,7 @@ class KafkaApisTest {
       clientId, 0)
     val requestContext = new RequestContext(requestHeader, "1", InetAddress.getLocalHost,
       KafkaPrincipal.ANONYMOUS, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
-      SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false)
+      SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY)
 
     val expectedActions = Seq(
       new Action(operation, new ResourcePattern(resourceType, resourceName1, PatternType.LITERAL),
@@ -2260,7 +2260,7 @@ class KafkaApisTest {
     // read the header from the buffer first so that the body can be read next from the Request constructor
     val header = RequestHeader.parse(buffer)
     val context = new RequestContext(header, "1", InetAddress.getLocalHost, KafkaPrincipal.ANONYMOUS,
-      listenerName, SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false)
+      listenerName, SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY)
     new RequestChannel.Request(processor = 1, context = context, startTimeNanos = 0, MemoryPool.NONE, buffer,
       requestChannelMetrics)
   }
diff --git a/core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala
index ff33084..bffbeec 100644
--- a/core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala
@@ -56,7 +56,7 @@ class ThrottledChannelExpirationTest {
     // read the header from the buffer first so that the body can be read next from the Request constructor
     val header = RequestHeader.parse(buffer)
     val context = new RequestContext(header, "1", InetAddress.getLocalHost, KafkaPrincipal.ANONYMOUS,
-      listenerName, SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false)
+      listenerName, SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY)
     (request, new RequestChannel.Request(processor = 1, context = context, startTimeNanos =  0, MemoryPool.NONE, buffer,
       requestChannelMetrics))
   }
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java
index 060ff3a..12ac243 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java
@@ -96,7 +96,7 @@ public class AclAuthorizerBenchmark {
             1, true, true));
         context = new RequestContext(new RequestHeader(ApiKeys.PRODUCE, Integer.valueOf(1).shortValue(),
             "someclient", 1), "1", InetAddress.getLocalHost(), KafkaPrincipal.ANONYMOUS,
-            ListenerName.normalised("listener"), SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false);
+            ListenerName.normalised("listener"), SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY);
     }
 
     private void setFieldValue(Object obj, String fieldName, Object value) throws Exception {
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
index e2242c1..95c0e50 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
@@ -202,8 +202,7 @@ public class MetadataRequestBenchmark {
         RequestHeader header = RequestHeader.parse(buffer);
 
         RequestContext context = new RequestContext(header, "1", null, principal,
-            ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
-            SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false);
+            ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY);
         return new RequestChannel.Request(1, context, 0, MemoryPool.NONE, buffer, requestChannelMetrics);
     }