You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2018/10/28 17:51:54 UTC

[kafka] 01/01: MINOR: Introduce KafkaChannel.newRequestContext

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch kafka-channel-clean-up
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit fd4f4914367efc1d1aa5c681548ee716f831cbae
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Sun Oct 28 10:50:39 2018 -0700

    MINOR: Introduce KafkaChannel.newRequestContext
    
    A minor clean-up that I noticed while looking at the
    reauthentication PR.
---
 .../main/java/org/apache/kafka/common/network/KafkaChannel.java   | 8 ++++++++
 core/src/main/scala/kafka/network/SocketServer.scala              | 8 +++-----
 2 files changed, 11 insertions(+), 5 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
index 47b1375..fd34086 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
@@ -19,7 +19,11 @@ package org.apache.kafka.common.network;
 import java.net.SocketAddress;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.memory.MemoryPool;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.RequestHeader;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.utils.Utils;
 
 import java.io.IOException;
@@ -238,6 +242,10 @@ public class KafkaChannel {
         return transportLayer.selectionKey();
     }
 
+    public RequestContext newRequestContext(RequestHeader header, ListenerName listenerName, SecurityProtocol protocol) {
+        return new RequestContext(header, id, socketAddress(), principal(), listenerName, protocol);
+    }
+
     /**
      * externally muting a channel should be done via selector to ensure proper state handling
      */
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index b5b3e4d..da4b7bc 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -735,14 +735,12 @@ private[kafka] class Processor(val id: Int,
                 debug(s"Disconnected expired channel: $channel : $header")
                 expiredConnectionsKilledCount.record(null, 1, 0)
               } else {
-                val connectionId = receive.source
-                val context = new RequestContext(header, connectionId, channel.socketAddress,
-                  channel.principal, listenerName, securityProtocol)
+                val context = channel.newRequestContext(header, listenerName, securityProtocol)
                 val req = new RequestChannel.Request(processor = id, context = context,
                   startTimeNanos = nowNanos, memoryPool, receive.payload, requestChannel.metrics)
                 requestChannel.sendRequest(req)
-                selector.mute(connectionId)
-                handleChannelMuteEvent(connectionId, ChannelMuteEvent.REQUEST_RECEIVED)
+                selector.mute(channel.id)
+                handleChannelMuteEvent(channel.id, ChannelMuteEvent.REQUEST_RECEIVED)
               }
             }
           case None =>