You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/10/26 10:52:26 UTC

[GitHub] [kafka] rajinisivaram commented on a change in pull request #9103: KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

rajinisivaram commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r511872018



##########
File path: core/src/main/scala/kafka/network/RequestChannel.scala
##########
@@ -94,19 +104,63 @@ object RequestChannel extends Logging {
     @volatile var recordNetworkThreadTimeCallback: Option[Long => Unit] = None
 
     val session = Session(context.principal, context.clientAddress)
+
     private val bodyAndSize: RequestAndSize = context.parseRequest(buffer)
 
     def header: RequestHeader = context.header
     def sizeOfBodyInBytes: Int = bodyAndSize.size
 
-    //most request types are parsed entirely into objects at this point. for those we can release the underlying buffer.
-    //some (like produce, or any time the schema contains fields of types BYTES or NULLABLE_BYTES) retain a reference
-    //to the buffer. for those requests we cannot release the buffer early, but only when request processing is done.
+    // most request types are parsed entirely into objects at this point. for those we can release the underlying buffer.
+    // some (like produce, or any time the schema contains fields of types BYTES or NULLABLE_BYTES) retain a reference
+    // to the buffer. for those requests we cannot release the buffer early, but only when request processing is done.
     if (!header.apiKey.requiresDelayedAllocation) {
       releaseBuffer()
     }
 
-    def requestDesc(details: Boolean): String = s"$header -- ${loggableRequest.toString(details)}"
+    def buildResponse(abstractResponse: AbstractResponse,
+                      error: Errors): Send = {
+      envelopeContext match {
+        case Some(envelopeContext) =>
+          val envelopeResponse = new EnvelopeResponse(
+            abstractResponse.throttleTimeMs(),

Review comment:
       I guess the only quota that is affected for the RPCs we currently forward is request quotas. Totally agree that we shouldn't throttle inter-broker connections. There are a few other things to consider here:
   
   1) Every forwarded request uses network thread and request handler time on two brokers. Are we saying that we can ignore the time spent on the forwarding broker because that is negligible? In a deployment with SSL on the external listener and PLAINTEXT on the inter-broker listener, there may be more network thread time used on the forwarding broker rather than the controller. Do we record these, but use the controller throttle time for throttling?
   2)  Are we changing the semantics of quotas? For example, if a client sends a request1 to leastLoadedNode A which mutes the connection and then sends request2 to leastLoadedNode B that happens to be the controller, we would mute that connection too. Another client with the same principal would get muted on B, but not A because A's quota hasn't been violated. I think this should be ok, though a bit confusing.
   3) Are these measures good enough to protect the controller? This is the one that needs some more thought. Request quotas are configured to allocate a percentage of thread usage to each principal. Our quotas aren't very good at protecting against DOS attacks, but they help to limit usage for normal clients using the APIs. So if we can make sure the implementation for forwarded requests can handle this case, it would be good enough. In the old world, a client doing a lot of config updates would have just distributed the load across brokers as each node was throttled. Now, we distribute the iniital request across brokers as controller decides to throttle. Total rate for these requests across the cluster is dramatically reduced because all load is now on the controller. But from the controller broker's point of view, we are now allowing more requests through for the same quota from every client because a client can forward through `n` brokers. @apovzner may have more context on wheth
 er these request types actually hit request quotas in real deployments.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org