You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/08/07 22:43:44 UTC

[GitHub] [kafka] abbccdda opened a new pull request #9144: KAFKA-9705: (part-1) add redirection fields in the request header

abbccdda opened a new pull request #9144:
URL: https://github.com/apache/kafka/pull/9144


   Add the redirection supporting fields, including:
   
   1. initial principal name
   2. initial client id
   3. the flag to indicate whether a given request is coming from the control plane in a secured environment.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #9144: KAFKA-9705: (part-1) add redirection fields in the request header

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #9144:
URL: https://github.com/apache/kafka/pull/9144#discussion_r468090360



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/RequestContext.java
##########
@@ -74,7 +77,9 @@ public RequestAndSize parseRequest(ByteBuffer buffer) {
                         ", apiVersion: " + header.apiVersion() +
                         ", connectionId: " + connectionId +
                         ", listenerName: " + listenerName +
-                        ", principal: " + principal, ex);
+                        ", principal: " + principal +
+                        ", initial principal: " + initialPrincipalName() +
+                        ", initial client id: " + header.initialClientId(), ex);

Review comment:
       nit: Could we use came case like the others?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #9144: KAFKA-9705: (part-1) add redirection fields in the request header

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #9144:
URL: https://github.com/apache/kafka/pull/9144#discussion_r467330705



##########
File path: clients/src/main/java/org/apache/kafka/server/authorizer/AuthorizableRequestContext.java
##########
@@ -69,4 +69,11 @@
      * Returns the correlation id from the request header.
      */
     int correlationId();
+
+    /**
+     * Returns the initial principal name for a forwarded request.
+     */
+    default String initialPrincipalName() {

Review comment:
       Does this need to be here?  I'm concerned that having it here will eventually lead to us using it for authorization, which it shouldn't be.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9144: KAFKA-9705: (part-1) add redirection fields in the request header

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9144:
URL: https://github.com/apache/kafka/pull/9144#discussion_r468939109



##########
File path: clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
##########
@@ -189,16 +189,18 @@ ClientRequest newClientRequest(String nodeId, AbstractRequest.Builder<?> request
      *                         cancelling the request. The request may get cancelled sooner if the socket disconnects
      *                         for any reason including if another pending request to the same node timed out first.
      * @param callback the callback to invoke when we get a response
+     * @param initialPrincipalName the initial client principal name, when building a forward request
+     * @param initialClientId the initial client id, when building a forward request
      */
     ClientRequest newClientRequest(String nodeId,
                                    AbstractRequest.Builder<?> requestBuilder,
                                    long createdTimeMs,
                                    boolean expectResponse,
                                    int requestTimeoutMs,
+                                   String initialPrincipalName,

Review comment:
       I'm not sure it's meaningful to add an overload function just for the sake of minimizing code changes. Plus, we should not underestimate the more code changes potentially being added when we have 2 almost identical request builder function.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #9144: KAFKA-9705: (part-1) add redirection fields in the request header

Posted by GitBox <gi...@apache.org>.
cmccabe commented on pull request #9144:
URL: https://github.com/apache/kafka/pull/9144#issuecomment-675703162


   LGTM


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #9144: KAFKA-9705: (part-1) add redirection fields in the request header

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #9144:
URL: https://github.com/apache/kafka/pull/9144#discussion_r467331218



##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -88,15 +88,17 @@ class SocketServer(val config: KafkaConfig,
   private val memoryPoolDepletedTimeMetricName = metrics.metricName("MemoryPoolDepletedTimeTotal", MetricsGroup)
   memoryPoolSensor.add(new Meter(TimeUnit.MILLISECONDS, memoryPoolDepletedPercentMetricName, memoryPoolDepletedTimeMetricName))
   private val memoryPool = if (config.queuedMaxBytes > 0) new SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, memoryPoolSensor) else MemoryPool.NONE
-  // data-plane
-  private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]()
-  private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, Acceptor]()
-  val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneMetricPrefix, time)
   // control-plane
   private var controlPlaneProcessorOpt : Option[Processor] = None
   private[network] var controlPlaneAcceptorOpt : Option[Acceptor] = None
   val controlPlaneRequestChannelOpt: Option[RequestChannel] = config.controlPlaneListenerName.map(_ =>
-    new RequestChannel(20, ControlPlaneMetricPrefix, time))
+    new RequestChannel(20, ControlPlaneMetricPrefix, time, true))
+  // data-plane
+  private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]()
+  private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, Acceptor]()
+  // If the control plane processor is not defined, just set the flag to true in data plane to bypass the check for whether a given
+  // request is from the control plane or not.

Review comment:
       I don't think this is quite right.  The flag should be set based on whether we're in the control plane listener OR the inter-broker listener.  It's much more common to use a separate inter-broker listener, than to use a fully separate control plane listener.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9144: KAFKA-9705: (part-1) add redirection fields in the request header

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9144:
URL: https://github.com/apache/kafka/pull/9144#discussion_r468939109



##########
File path: clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
##########
@@ -189,16 +189,18 @@ ClientRequest newClientRequest(String nodeId, AbstractRequest.Builder<?> request
      *                         cancelling the request. The request may get cancelled sooner if the socket disconnects
      *                         for any reason including if another pending request to the same node timed out first.
      * @param callback the callback to invoke when we get a response
+     * @param initialPrincipalName the initial client principal name, when building a forward request
+     * @param initialClientId the initial client id, when building a forward request
      */
     ClientRequest newClientRequest(String nodeId,
                                    AbstractRequest.Builder<?> requestBuilder,
                                    long createdTimeMs,
                                    boolean expectResponse,
                                    int requestTimeoutMs,
+                                   String initialPrincipalName,

Review comment:
       I'm not sure it's meaningful to add an overload function just for the sake of minimizing code changes.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #9144: KAFKA-9705: (part-1) add redirection fields in the request header

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #9144:
URL: https://github.com/apache/kafka/pull/9144#discussion_r468841825



##########
File path: clients/src/main/java/org/apache/kafka/server/authorizer/AuthorizableRequestContext.java
##########
@@ -69,4 +69,11 @@
      * Returns the correlation id from the request header.
      */
     int correlationId();
+
+    /**
+     * Returns the initial principal name for a forwarded request.
+     */
+    default String initialPrincipalName() {

Review comment:
       Yes, can you add a comment saying this should be used for logging, but not for authorization?

##########
File path: clients/src/main/java/org/apache/kafka/server/authorizer/AuthorizableRequestContext.java
##########
@@ -69,4 +69,11 @@
      * Returns the correlation id from the request header.
      */
     int correlationId();
+
+    /**
+     * Returns the initial principal name for a forwarded request.
+     */
+    default String initialPrincipalName() {

Review comment:
       Yes, can you add a comment here saying this should be used for logging, but not for authorization?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9144: KAFKA-9705: (part-1) add redirection fields in the request header

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9144:
URL: https://github.com/apache/kafka/pull/9144#discussion_r468005080



##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -88,15 +88,17 @@ class SocketServer(val config: KafkaConfig,
   private val memoryPoolDepletedTimeMetricName = metrics.metricName("MemoryPoolDepletedTimeTotal", MetricsGroup)
   memoryPoolSensor.add(new Meter(TimeUnit.MILLISECONDS, memoryPoolDepletedPercentMetricName, memoryPoolDepletedTimeMetricName))
   private val memoryPool = if (config.queuedMaxBytes > 0) new SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, memoryPoolSensor) else MemoryPool.NONE
-  // data-plane
-  private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]()
-  private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, Acceptor]()
-  val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneMetricPrefix, time)
   // control-plane
   private var controlPlaneProcessorOpt : Option[Processor] = None
   private[network] var controlPlaneAcceptorOpt : Option[Acceptor] = None
   val controlPlaneRequestChannelOpt: Option[RequestChannel] = config.controlPlaneListenerName.map(_ =>
-    new RequestChannel(20, ControlPlaneMetricPrefix, time))
+    new RequestChannel(20, ControlPlaneMetricPrefix, time, true))
+  // data-plane
+  private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]()
+  private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, Acceptor]()
+  // If the control plane processor is not defined, just set the flag to true in data plane to bypass the check for whether a given
+  // request is from the control plane or not.

Review comment:
       What if the user doesn't configure an inter-broker listener?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #9144: KAFKA-9705: (part-1) add redirection fields in the request header

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #9144:
URL: https://github.com/apache/kafka/pull/9144#discussion_r468841223



##########
File path: clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
##########
@@ -189,16 +189,18 @@ ClientRequest newClientRequest(String nodeId, AbstractRequest.Builder<?> request
      *                         cancelling the request. The request may get cancelled sooner if the socket disconnects
      *                         for any reason including if another pending request to the same node timed out first.
      * @param callback the callback to invoke when we get a response
+     * @param initialPrincipalName the initial client principal name, when building a forward request
+     * @param initialClientId the initial client id, when building a forward request
      */
     ClientRequest newClientRequest(String nodeId,
                                    AbstractRequest.Builder<?> requestBuilder,
                                    long createdTimeMs,
                                    boolean expectResponse,
                                    int requestTimeoutMs,
+                                   String initialPrincipalName,

Review comment:
       can we have an overload for this that doesn't have these two fields, and defaults them to null?  It would really help avoid having so much churn in junit tests.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #9144: KAFKA-9705: (part-1) add redirection fields in the request header

Posted by GitBox <gi...@apache.org>.
cmccabe commented on pull request #9144:
URL: https://github.com/apache/kafka/pull/9144#issuecomment-672261745


   We should also have a test of the privileged listener logic, to ensure that the right listeners are privileged in various configurations (not sure what junit test file that could go in ... socket server test maybe?)  Ideally we would do it as a unit test and not open sockets, etc.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #9144: KAFKA-9705: (part-1) add redirection fields in the request header

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #9144:
URL: https://github.com/apache/kafka/pull/9144#discussion_r468843129



##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -88,15 +88,17 @@ class SocketServer(val config: KafkaConfig,
   private val memoryPoolDepletedTimeMetricName = metrics.metricName("MemoryPoolDepletedTimeTotal", MetricsGroup)
   memoryPoolSensor.add(new Meter(TimeUnit.MILLISECONDS, memoryPoolDepletedPercentMetricName, memoryPoolDepletedTimeMetricName))
   private val memoryPool = if (config.queuedMaxBytes > 0) new SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, memoryPoolSensor) else MemoryPool.NONE
-  // data-plane
-  private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]()
-  private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, Acceptor]()
-  val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneMetricPrefix, time)
   // control-plane
   private var controlPlaneProcessorOpt : Option[Processor] = None
   private[network] var controlPlaneAcceptorOpt : Option[Acceptor] = None
   val controlPlaneRequestChannelOpt: Option[RequestChannel] = config.controlPlaneListenerName.map(_ =>
-    new RequestChannel(20, ControlPlaneMetricPrefix, time))
+    new RequestChannel(20, ControlPlaneMetricPrefix, time, true))
+  // data-plane
+  private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]()
+  private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, Acceptor]()
+  // If the control plane processor is not defined, just set the flag to true in data plane to bypass the check for whether a given
+  // request is from the control plane or not.

Review comment:
       From the above:
   1. if we have a control plane listener, the privileged listener is that one, and only that one
   2. otherwise, if we have inter-broker listeners, only those are privileged,
   3. otherwise, all listeners are privileged
   
   We should have this in the JavaDoc for SocketServer




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #9144: KAFKA-9705: (part-1) add redirection fields in the request header

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #9144:
URL: https://github.com/apache/kafka/pull/9144#discussion_r472477998



##########
File path: clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
##########
@@ -189,16 +189,18 @@ ClientRequest newClientRequest(String nodeId, AbstractRequest.Builder<?> request
      *                         cancelling the request. The request may get cancelled sooner if the socket disconnects
      *                         for any reason including if another pending request to the same node timed out first.
      * @param callback the callback to invoke when we get a response
+     * @param initialPrincipalName the initial client principal name, when building a forward request
+     * @param initialClientId the initial client id, when building a forward request
      */
     ClientRequest newClientRequest(String nodeId,
                                    AbstractRequest.Builder<?> requestBuilder,
                                    long createdTimeMs,
                                    boolean expectResponse,
                                    int requestTimeoutMs,
+                                   String initialPrincipalName,

Review comment:
       Fair enough




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #9144: KAFKA-9705: (part-1) add redirection fields in the request header

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #9144:
URL: https://github.com/apache/kafka/pull/9144#discussion_r468090360



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/RequestContext.java
##########
@@ -74,7 +77,9 @@ public RequestAndSize parseRequest(ByteBuffer buffer) {
                         ", apiVersion: " + header.apiVersion() +
                         ", connectionId: " + connectionId +
                         ", listenerName: " + listenerName +
-                        ", principal: " + principal, ex);
+                        ", principal: " + principal +
+                        ", initial principal: " + initialPrincipalName() +
+                        ", initial client id: " + header.initialClientId(), ex);

Review comment:
       nit: Could we use camel case like the others?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9144: KAFKA-9705: (part-1) add redirection fields in the request header

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9144:
URL: https://github.com/apache/kafka/pull/9144#discussion_r467975757



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
##########
@@ -37,11 +37,22 @@ public RequestHeader(Struct struct, short headerVersion) {
     }
 
     public RequestHeader(ApiKeys requestApiKey, short requestVersion, String clientId, int correlationId) {
-        this(new RequestHeaderData().
-                setRequestApiKey(requestApiKey.id).
-                setRequestApiVersion(requestVersion).
-                setClientId(clientId).
-                setCorrelationId(correlationId),
+        this(requestApiKey, requestVersion, clientId, correlationId, null, null);
+    }
+
+    public RequestHeader(ApiKeys requestApiKey,

Review comment:
       We do have a constructor which doesn't contain the two fields above.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #9144: KAFKA-9705: (part-1) add redirection fields in the request header

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #9144:
URL: https://github.com/apache/kafka/pull/9144#discussion_r468840712



##########
File path: clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
##########
@@ -42,6 +44,8 @@
      * @param clientId The client ID to use for the header
      * @param createdTimeMs The unix timestamp in milliseconds for the time at which this request was created.
      * @param expectResponse Should we expect a response message or is this request complete once it is sent?
+     * @param initialPrincipalName The initial principal name if this is a redirect request

Review comment:
       "or null if this was not redirected"

##########
File path: clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
##########
@@ -42,6 +44,8 @@
      * @param clientId The client ID to use for the header
      * @param createdTimeMs The unix timestamp in milliseconds for the time at which this request was created.
      * @param expectResponse Should we expect a response message or is this request complete once it is sent?
+     * @param initialPrincipalName The initial principal name if this is a redirect request
+     * @param initialClientId The initial client id if this is a redirect request

Review comment:
       "or null if this was not redirected"




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe merged pull request #9144: KAFKA-9705: (part-1) add redirection fields in the request header

Posted by GitBox <gi...@apache.org>.
cmccabe merged pull request #9144:
URL: https://github.com/apache/kafka/pull/9144


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #9144: KAFKA-9705: (part-1) add redirection fields in the request header

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #9144:
URL: https://github.com/apache/kafka/pull/9144#discussion_r467330529



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
##########
@@ -37,11 +37,22 @@ public RequestHeader(Struct struct, short headerVersion) {
     }
 
     public RequestHeader(ApiKeys requestApiKey, short requestVersion, String clientId, int correlationId) {
-        this(new RequestHeaderData().
-                setRequestApiKey(requestApiKey.id).
-                setRequestApiVersion(requestVersion).
-                setClientId(clientId).
-                setCorrelationId(correlationId),
+        this(requestApiKey, requestVersion, clientId, correlationId, null, null);
+    }
+
+    public RequestHeader(ApiKeys requestApiKey,

Review comment:
       I think it would be good to have a constructor that didn't have the two forwarding fields, to avoid changing a large number of tests.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #9144: KAFKA-9705: (part-1) add redirection fields in the request header

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #9144:
URL: https://github.com/apache/kafka/pull/9144#discussion_r467331218



##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -88,15 +88,17 @@ class SocketServer(val config: KafkaConfig,
   private val memoryPoolDepletedTimeMetricName = metrics.metricName("MemoryPoolDepletedTimeTotal", MetricsGroup)
   memoryPoolSensor.add(new Meter(TimeUnit.MILLISECONDS, memoryPoolDepletedPercentMetricName, memoryPoolDepletedTimeMetricName))
   private val memoryPool = if (config.queuedMaxBytes > 0) new SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, memoryPoolSensor) else MemoryPool.NONE
-  // data-plane
-  private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]()
-  private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, Acceptor]()
-  val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneMetricPrefix, time)
   // control-plane
   private var controlPlaneProcessorOpt : Option[Processor] = None
   private[network] var controlPlaneAcceptorOpt : Option[Acceptor] = None
   val controlPlaneRequestChannelOpt: Option[RequestChannel] = config.controlPlaneListenerName.map(_ =>
-    new RequestChannel(20, ControlPlaneMetricPrefix, time))
+    new RequestChannel(20, ControlPlaneMetricPrefix, time, true))
+  // data-plane
+  private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]()
+  private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, Acceptor]()
+  // If the control plane processor is not defined, just set the flag to true in data plane to bypass the check for whether a given
+  // request is from the control plane or not.

Review comment:
       I don't think this is quite right.  The flag should be set based on whether we're in the control plane listener OR the inter-broker listener.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #9144: KAFKA-9705: (part-1) add redirection fields in the request header

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #9144:
URL: https://github.com/apache/kafka/pull/9144#discussion_r468840231



##########
File path: clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
##########
@@ -147,6 +153,7 @@ public KafkaChannel(String id, TransportLayer transportLayer, Supplier<Authentic
         this.disconnected = false;
         this.muteState = ChannelMuteState.NOT_MUTED;
         this.state = ChannelState.NOT_CONNECTED;
+        this.isInterBrokerListener = isInterBrokerListener;

Review comment:
       How about `privilegedListener`?  Then we could add a comment explaining what that means:
   1. if there is a control plane listener, only it is privileged
   2. otherwise, if there is an inter broker listener, only it is privileged
   3. otherwise, all listeners are privileged




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9144: KAFKA-9705: (part-1) add redirection fields in the request header

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9144:
URL: https://github.com/apache/kafka/pull/9144#discussion_r468939109



##########
File path: clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
##########
@@ -189,16 +189,18 @@ ClientRequest newClientRequest(String nodeId, AbstractRequest.Builder<?> request
      *                         cancelling the request. The request may get cancelled sooner if the socket disconnects
      *                         for any reason including if another pending request to the same node timed out first.
      * @param callback the callback to invoke when we get a response
+     * @param initialPrincipalName the initial client principal name, when building a forward request
+     * @param initialClientId the initial client id, when building a forward request
      */
     ClientRequest newClientRequest(String nodeId,
                                    AbstractRequest.Builder<?> requestBuilder,
                                    long createdTimeMs,
                                    boolean expectResponse,
                                    int requestTimeoutMs,
+                                   String initialPrincipalName,

Review comment:
       I'm not sure it's meaningful to add an overload function just for the sake of minimizing code changes. Plus, I'm not sure whether this counts as a public interface, although it's not included in the standard java docs:
   ```
   What are the "public interfaces" of the project?
   
   - Binary log format
   - The network protocol and api behavior
   - Any class for which the build generates Javadoc.  
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #9144: KAFKA-9705: (part-1) add redirection fields in the request header

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #9144:
URL: https://github.com/apache/kafka/pull/9144#discussion_r468842602



##########
File path: core/src/main/scala/kafka/network/RequestChannel.scala
##########
@@ -309,7 +309,10 @@ object RequestChannel extends Logging {
   }
 }
 
-class RequestChannel(val queueSize: Int, val metricNamePrefix : String, time: Time) extends KafkaMetricsGroup {
+class RequestChannel(val queueSize: Int,
+                     val metricNamePrefix : String,
+                     time: Time,
+                     val maybeFromControlPlane: Boolean) extends KafkaMetricsGroup {

Review comment:
       I think `fromPrivilegedListener` is clearest




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9144: KAFKA-9705: (part-1) add redirection fields in the request header

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9144:
URL: https://github.com/apache/kafka/pull/9144#discussion_r467975394



##########
File path: clients/src/main/java/org/apache/kafka/server/authorizer/AuthorizableRequestContext.java
##########
@@ -69,4 +69,11 @@
      * Returns the correlation id from the request header.
      */
     int correlationId();
+
+    /**
+     * Returns the initial principal name for a forwarded request.
+     */
+    default String initialPrincipalName() {

Review comment:
       Yea, we need to have it here for audit logging. We could of course have the meta comment suggest "do not use for authorization"




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #9144: KAFKA-9705: (part-1) add redirection fields in the request header

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #9144:
URL: https://github.com/apache/kafka/pull/9144#discussion_r467330968



##########
File path: core/src/main/scala/kafka/network/RequestChannel.scala
##########
@@ -309,7 +309,10 @@ object RequestChannel extends Logging {
   }
 }
 
-class RequestChannel(val queueSize: Int, val metricNamePrefix : String, time: Time) extends KafkaMetricsGroup {
+class RequestChannel(val queueSize: Int,
+                     val metricNamePrefix : String,
+                     time: Time,
+                     val maybeFromControlPlane: Boolean) extends KafkaMetricsGroup {

Review comment:
       Instead of `maybeFromControlPlane`, how about `fromControlPlaneListener`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org