You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2018/10/28 17:51:54 UTC
[kafka] 01/01: MINOR: Introduce KafkaChannel.newRequestContext
This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch kafka-channel-clean-up
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit fd4f4914367efc1d1aa5c681548ee716f831cbae
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Sun Oct 28 10:50:39 2018 -0700
MINOR: Introduce KafkaChannel.newRequestContext
A minor clean-up that I noticed while looking at the
reauthentication PR.
---
.../main/java/org/apache/kafka/common/network/KafkaChannel.java | 8 ++++++++
core/src/main/scala/kafka/network/SocketServer.scala | 8 +++-----
2 files changed, 11 insertions(+), 5 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
index 47b1375..fd34086 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
@@ -19,7 +19,11 @@ package org.apache.kafka.common.network;
import java.net.SocketAddress;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.memory.MemoryPool;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Utils;
import java.io.IOException;
@@ -238,6 +242,10 @@ public class KafkaChannel {
return transportLayer.selectionKey();
}
+ public RequestContext newRequestContext(RequestHeader header, ListenerName listenerName, SecurityProtocol protocol) {
+ return new RequestContext(header, id, socketAddress(), principal(), listenerName, protocol);
+ }
+
/**
* externally muting a channel should be done via selector to ensure proper state handling
*/
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index b5b3e4d..da4b7bc 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -735,14 +735,12 @@ private[kafka] class Processor(val id: Int,
debug(s"Disconnected expired channel: $channel : $header")
expiredConnectionsKilledCount.record(null, 1, 0)
} else {
- val connectionId = receive.source
- val context = new RequestContext(header, connectionId, channel.socketAddress,
- channel.principal, listenerName, securityProtocol)
+ val context = channel.newRequestContext(header, listenerName, securityProtocol)
val req = new RequestChannel.Request(processor = id, context = context,
startTimeNanos = nowNanos, memoryPool, receive.payload, requestChannel.metrics)
requestChannel.sendRequest(req)
- selector.mute(connectionId)
- handleChannelMuteEvent(connectionId, ChannelMuteEvent.REQUEST_RECEIVED)
+ selector.mute(channel.id)
+ handleChannelMuteEvent(channel.id, ChannelMuteEvent.REQUEST_RECEIVED)
}
}
case None =>