You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bo...@apache.org on 2020/10/29 19:42:24 UTC
[kafka] branch 2.7 updated: Revert "KAFKA-9705 part 1: add KIP-590
request header fields (#9144)" (#9523) (#9529)
This is an automated email from the ASF dual-hosted git repository.
boyang pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.7 by this push:
new 3023ac6 Revert "KAFKA-9705 part 1: add KIP-590 request header fields (#9144)" (#9523) (#9529)
3023ac6 is described below
commit 3023ac6cede6777afe2f419b4068147ab4a1e04e
Author: Boyang Chen <bo...@confluent.io>
AuthorDate: Thu Oct 29 12:41:09 2020 -0700
Revert "KAFKA-9705 part 1: add KIP-590 request header fields (#9144)" (#9523) (#9529)
This reverts commit 21dc5231ce9c7398c7ede4dbefa2f2202e06b2d4 as we decide to use Envelope for redirection instead of initial principal.
Reviewers: Jason Gustafson <ja...@confluent.io>
---
.../org/apache/kafka/clients/ClientRequest.java | 20 ++---
.../java/org/apache/kafka/clients/KafkaClient.java | 6 +-
.../org/apache/kafka/clients/NetworkClient.java | 6 +-
.../kafka/clients/admin/KafkaAdminClient.java | 2 +-
.../consumer/internals/ConsumerNetworkClient.java | 8 +-
.../kafka/clients/producer/internals/Sender.java | 6 +-
.../kafka/common/requests/RequestContext.java | 14 +---
.../kafka/common/requests/RequestHeader.java | 31 ++------
.../authenticator/SaslServerAuthenticator.java | 9 ++-
.../authorizer/AuthorizableRequestContext.java | 9 ---
.../resources/common/message/RequestHeader.json | 8 +-
.../java/org/apache/kafka/clients/MockClient.java | 6 +-
.../apache/kafka/clients/NetworkClientTest.java | 28 ++-----
.../clients/producer/internals/SenderTest.java | 4 +-
.../kafka/common/requests/RequestContextTest.java | 12 +--
.../kafka/common/requests/RequestHeaderTest.java | 13 ----
.../scala/kafka/common/InterBrokerSendThread.scala | 2 -
.../main/scala/kafka/network/SocketServer.scala | 28 ++-----
.../scala/kafka/raft/KafkaNetworkChannel.scala | 2 +-
.../main/scala/kafka/tools/TestRaftServer.scala | 5 +-
.../kafka/common/InterBrokerSendThreadTest.scala | 12 +--
.../unit/kafka/network/RequestChannelTest.scala | 3 +-
.../unit/kafka/network/SocketServerTest.scala | 91 ++--------------------
.../security/authorizer/AclAuthorizerTest.scala | 2 +-
.../kafka/server/BaseClientQuotaManagerTest.scala | 4 +-
.../scala/unit/kafka/server/KafkaApisTest.scala | 6 +-
.../server/ThrottledChannelExpirationTest.scala | 2 +-
.../kafka/jmh/acl/AclAuthorizerBenchmark.java | 2 +-
.../jmh/metadata/MetadataRequestBenchmark.java | 3 +-
29 files changed, 73 insertions(+), 271 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
index 37b6397..627615c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
@@ -34,8 +34,6 @@ public final class ClientRequest {
private final boolean expectResponse;
private final int requestTimeoutMs;
private final RequestCompletionHandler callback;
- private final String initialPrincipalName;
- private final String initialClientId;
/**
* @param destination The brokerId to send the request to
@@ -44,8 +42,6 @@ public final class ClientRequest {
* @param clientId The client ID to use for the header
* @param createdTimeMs The unix timestamp in milliseconds for the time at which this request was created.
* @param expectResponse Should we expect a response message or is this request complete once it is sent?
- * @param initialPrincipalName The initial principal name if this is a redirect request, or null if this was not redirected
- * @param initialClientId The initial client id if this is a redirect request, or null if this was not redirected
* @param callback A callback to execute when the response has been received (or null if no callback is necessary)
*/
public ClientRequest(String destination,
@@ -55,8 +51,6 @@ public final class ClientRequest {
long createdTimeMs,
boolean expectResponse,
int requestTimeoutMs,
- String initialPrincipalName,
- String initialClientId,
RequestCompletionHandler callback) {
this.destination = destination;
this.requestBuilder = requestBuilder;
@@ -66,8 +60,6 @@ public final class ClientRequest {
this.expectResponse = expectResponse;
this.requestTimeoutMs = requestTimeoutMs;
this.callback = callback;
- this.initialPrincipalName = initialPrincipalName;
- this.initialClientId = initialClientId;
}
@Override
@@ -93,13 +85,11 @@ public final class ClientRequest {
public RequestHeader makeHeader(short version) {
short requestApiKey = requestBuilder.apiKey().id;
return new RequestHeader(
- new RequestHeaderData()
- .setRequestApiKey(requestApiKey)
- .setRequestApiVersion(version)
- .setClientId(clientId)
- .setCorrelationId(correlationId)
- .setInitialPrincipalName(initialPrincipalName)
- .setInitialClientId(initialClientId),
+ new RequestHeaderData().
+ setRequestApiKey(requestApiKey).
+ setRequestApiVersion(version).
+ setClientId(clientId).
+ setCorrelationId(correlationId),
ApiKeys.forId(requestApiKey).requestHeaderVersion(version));
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
index b539025..18a7eef 100644
--- a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
@@ -189,18 +189,16 @@ public interface KafkaClient extends Closeable {
* cancelling the request. The request may get cancelled sooner if the socket disconnects
* for any reason including if another pending request to the same node timed out first.
* @param callback the callback to invoke when we get a response
- * @param initialPrincipalName the initial client principal name, when building a forward request
- * @param initialClientId the initial client id, when building a forward request
*/
ClientRequest newClientRequest(String nodeId,
AbstractRequest.Builder<?> requestBuilder,
long createdTimeMs,
boolean expectResponse,
int requestTimeoutMs,
- String initialPrincipalName,
- String initialClientId,
RequestCompletionHandler callback);
+
+
/**
* Initiates shutdown of this client. This method may be invoked from another thread while this
* client is being polled. No further requests may be sent using the client. The current poll()
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index cf6ef87..5287124 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -1199,7 +1199,7 @@ public class NetworkClient implements KafkaClient {
AbstractRequest.Builder<?> requestBuilder,
long createdTimeMs,
boolean expectResponse) {
- return newClientRequest(nodeId, requestBuilder, createdTimeMs, expectResponse, defaultRequestTimeoutMs, null, null, null);
+ return newClientRequest(nodeId, requestBuilder, createdTimeMs, expectResponse, defaultRequestTimeoutMs, null);
}
// visible for testing
@@ -1217,11 +1217,9 @@ public class NetworkClient implements KafkaClient {
long createdTimeMs,
boolean expectResponse,
int requestTimeoutMs,
- String initialPrincipalName,
- String initialClientId,
RequestCompletionHandler callback) {
return new ClientRequest(nodeId, requestBuilder, nextCorrelationId(), clientId, createdTimeMs, expectResponse,
- requestTimeoutMs, initialPrincipalName, initialClientId, callback);
+ requestTimeoutMs, callback);
}
public boolean discoverBrokerVersions() {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index dd9010f..3a21d38 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -1090,7 +1090,7 @@ public class KafkaAdminClient extends AdminClient {
continue;
}
ClientRequest clientRequest = client.newClientRequest(node.idString(), requestBuilder, now,
- true, requestTimeoutMs, null, null, null);
+ true, requestTimeoutMs, null);
log.debug("Sending {} to {}. correlationId={}", requestBuilder, node, clientRequest.correlationId());
client.send(clientRequest, now);
getOrCreateListValue(callsInFlight, node.idString()).add(call);
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index 5b732dc..8926abe 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -127,7 +127,7 @@ public class ConsumerNetworkClient implements Closeable {
long now = time.milliseconds();
RequestFutureCompletionHandler completionHandler = new RequestFutureCompletionHandler();
ClientRequest clientRequest = client.newClientRequest(node.idString(), requestBuilder, now, true,
- requestTimeoutMs, null, null, completionHandler);
+ requestTimeoutMs, completionHandler);
unsent.put(node, clientRequest);
// wakeup the client in case it is blocking in poll so that we can send the queued request
@@ -432,8 +432,8 @@ public class ConsumerNetworkClient implements Closeable {
RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler) request.callback();
AuthenticationException authenticationException = client.authenticationException(node);
handler.onComplete(new ClientResponse(request.makeHeader(request.requestBuilder().latestAllowedVersion()),
- request.callback(), request.destination(), request.createdTimeMs(), now, true,
- null, authenticationException, null));
+ request.callback(), request.destination(), request.createdTimeMs(), now, true,
+ null, authenticationException, null));
}
}
}
@@ -594,7 +594,7 @@ public class ConsumerNetworkClient implements Closeable {
future.raise(response.authenticationException());
} else if (response.wasDisconnected()) {
log.debug("Cancelled request with header {} due to node {} being disconnected",
- response.requestHeader(), response.destination());
+ response.requestHeader(), response.destination());
future.raise(DisconnectException.INSTANCE);
} else if (response.versionMismatch() != null) {
future.raise(response.versionMismatch());
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 19b2cb1..abcc657 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -469,8 +469,8 @@ public class Sender implements Runnable {
time.sleep(nextRequestHandler.retryBackoffMs());
long currentTimeMs = time.milliseconds();
- ClientRequest clientRequest = client.newClientRequest(targetNode.idString(), requestBuilder, currentTimeMs, true,
- requestTimeoutMs, null, null, nextRequestHandler);
+ ClientRequest clientRequest = client.newClientRequest(
+ targetNode.idString(), requestBuilder, currentTimeMs, true, requestTimeoutMs, nextRequestHandler);
log.debug("Sending transactional request {} to node {} with correlation ID {}", requestBuilder, targetNode, clientRequest.correlationId());
client.send(clientRequest, currentTimeMs);
transactionManager.setInFlightCorrelationId(clientRequest.correlationId());
@@ -758,7 +758,7 @@ public class Sender implements Runnable {
String nodeId = Integer.toString(destination);
ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0,
- requestTimeoutMs, null, null, callback);
+ requestTimeoutMs, callback);
client.send(clientRequest, now);
log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RequestContext.java b/clients/src/main/java/org/apache/kafka/common/requests/RequestContext.java
index f834135..71fbc33 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/RequestContext.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestContext.java
@@ -40,7 +40,6 @@ public class RequestContext implements AuthorizableRequestContext {
public final ListenerName listenerName;
public final SecurityProtocol securityProtocol;
public final ClientInformation clientInformation;
- public final boolean fromPrivilegedListener;
public RequestContext(RequestHeader header,
String connectionId,
@@ -48,8 +47,7 @@ public class RequestContext implements AuthorizableRequestContext {
KafkaPrincipal principal,
ListenerName listenerName,
SecurityProtocol securityProtocol,
- ClientInformation clientInformation,
- boolean fromPrivilegedListener) {
+ ClientInformation clientInformation) {
this.header = header;
this.connectionId = connectionId;
this.clientAddress = clientAddress;
@@ -57,7 +55,6 @@ public class RequestContext implements AuthorizableRequestContext {
this.listenerName = listenerName;
this.securityProtocol = securityProtocol;
this.clientInformation = clientInformation;
- this.fromPrivilegedListener = fromPrivilegedListener;
}
public RequestAndSize parseRequest(ByteBuffer buffer) {
@@ -77,9 +74,7 @@ public class RequestContext implements AuthorizableRequestContext {
", apiVersion: " + header.apiVersion() +
", connectionId: " + connectionId +
", listenerName: " + listenerName +
- ", principal: " + principal +
- ", initialPrincipal: " + initialPrincipalName() +
- ", initialClientId: " + header.initialClientId(), ex);
+ ", principal: " + principal, ex);
}
}
}
@@ -139,9 +134,4 @@ public class RequestContext implements AuthorizableRequestContext {
public int correlationId() {
return header.correlationId();
}
-
- @Override
- public String initialPrincipalName() {
- return header.initialPrincipalName();
- }
}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java b/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
index a5d90d2..3d80c4e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
@@ -37,22 +37,11 @@ public class RequestHeader implements AbstractRequestResponse {
}
public RequestHeader(ApiKeys requestApiKey, short requestVersion, String clientId, int correlationId) {
- this(requestApiKey, requestVersion, clientId, correlationId, null, null);
- }
-
- public RequestHeader(ApiKeys requestApiKey,
- short requestVersion,
- String clientId,
- int correlationId,
- String initialPrincipalName,
- String initialClientId) {
- this(new RequestHeaderData()
- .setRequestApiKey(requestApiKey.id)
- .setRequestApiVersion(requestVersion)
- .setClientId(clientId)
- .setCorrelationId(correlationId)
- .setInitialPrincipalName(initialPrincipalName)
- .setInitialClientId(initialClientId),
+ this(new RequestHeaderData().
+ setRequestApiKey(requestApiKey.id).
+ setRequestApiVersion(requestVersion).
+ setClientId(clientId).
+ setCorrelationId(correlationId),
ApiKeys.forId(requestApiKey.id).requestHeaderVersion(requestVersion));
}
@@ -81,14 +70,6 @@ public class RequestHeader implements AbstractRequestResponse {
return data.clientId();
}
- public String initialPrincipalName() {
- return data.initialPrincipalName();
- }
-
- public String initialClientId() {
- return data.initialClientId();
- }
-
public int correlationId() {
return data.correlationId();
}
@@ -125,8 +106,6 @@ public class RequestHeader implements AbstractRequestResponse {
", apiVersion=" + apiVersion() +
", clientId=" + clientId() +
", correlationId=" + correlationId() +
- ", initialPrincipalName=" + initialPrincipalName() +
- ", initialClientId=" + initialClientId() +
")";
}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index 20dbf7b..ecd5d4f 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -237,12 +237,15 @@ public class SaslServerAuthenticator implements Authenticator {
if (saslState != SaslState.REAUTH_PROCESS_HANDSHAKE) {
if (netOutBuffer != null && !flushNetOutBufferAndUpdateInterestOps())
return;
+
if (saslServer != null && saslServer.isComplete()) {
setSaslState(SaslState.COMPLETE);
return;
}
+
// allocate on heap (as opposed to any socket server memory pool)
if (netInBuffer == null) netInBuffer = new NetworkReceive(MAX_RECEIVE_SIZE, connectionId);
+
netInBuffer.readFrom(transportLayer);
if (!netInBuffer.complete())
return;
@@ -407,7 +410,7 @@ public class SaslServerAuthenticator implements Authenticator {
ApiKeys apiKey = header.apiKey();
short version = header.apiVersion();
RequestContext requestContext = new RequestContext(header, connectionId, clientAddress(),
- KafkaPrincipal.ANONYMOUS, listenerName, securityProtocol, ClientInformation.EMPTY, false);
+ KafkaPrincipal.ANONYMOUS, listenerName, securityProtocol, ClientInformation.EMPTY);
RequestAndSize requestAndSize = requestContext.parseRequest(requestBuffer);
if (apiKey != ApiKeys.SASL_AUTHENTICATE) {
IllegalSaslStateException e = new IllegalSaslStateException("Unexpected Kafka request of type " + apiKey + " during SASL authentication.");
@@ -493,7 +496,7 @@ public class SaslServerAuthenticator implements Authenticator {
RequestContext requestContext = new RequestContext(header, connectionId, clientAddress(),
- KafkaPrincipal.ANONYMOUS, listenerName, securityProtocol, ClientInformation.EMPTY, false);
+ KafkaPrincipal.ANONYMOUS, listenerName, securityProtocol, ClientInformation.EMPTY);
RequestAndSize requestAndSize = requestContext.parseRequest(requestBuffer);
if (apiKey == ApiKeys.API_VERSIONS)
handleApiVersionsRequest(requestContext, (ApiVersionsRequest) requestAndSize.request);
@@ -708,6 +711,6 @@ public class SaslServerAuthenticator implements Authenticator {
private long zeroIfNegative(long value) {
return Math.max(0L, value);
- }
+ }
}
}
diff --git a/clients/src/main/java/org/apache/kafka/server/authorizer/AuthorizableRequestContext.java b/clients/src/main/java/org/apache/kafka/server/authorizer/AuthorizableRequestContext.java
index eb9b02f..f68b938 100644
--- a/clients/src/main/java/org/apache/kafka/server/authorizer/AuthorizableRequestContext.java
+++ b/clients/src/main/java/org/apache/kafka/server/authorizer/AuthorizableRequestContext.java
@@ -69,13 +69,4 @@ public interface AuthorizableRequestContext {
* Returns the correlation id from the request header.
*/
int correlationId();
-
- /**
- * Returns the initial principal name for a forwarded request, or null if the
- * request is not forwarding. Note that this field should be used only for
- * audit logging purpose, not for authorization.
- */
- default String initialPrincipalName() {
- return null;
- }
}
diff --git a/clients/src/main/resources/common/message/RequestHeader.json b/clients/src/main/resources/common/message/RequestHeader.json
index 4285633..fbf4e2c 100644
--- a/clients/src/main/resources/common/message/RequestHeader.json
+++ b/clients/src/main/resources/common/message/RequestHeader.json
@@ -37,12 +37,6 @@
// Since the client is sending the ApiVersionsRequest in order to discover what
// versions are supported, the client does not know the best version to use.
{ "name": "ClientId", "type": "string", "versions": "1+", "nullableVersions": "1+", "ignorable": true,
- "flexibleVersions": "none", "about": "The client ID string." },
- { "name": "InitialPrincipalName", "type": "string", "tag": 0, "taggedVersions": "2+",
- "nullableVersions": "2+", "default": "null", "ignorable": true,
- "about": "Optional value of the initial principal name when the request is redirected by a broker, for audit logging and quota purpose." },
- { "name": "InitialClientId", "type": "string", "tag": 1, "taggedVersions": "2+",
- "nullableVersions": "2+", "default": "null", "ignorable": true,
- "about": "Optional value of the initial client id when the request is redirected by a broker, for quota purpose." }
+ "flexibleVersions": "none", "about": "The client ID string." }
]
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 20f5843..214ecc5 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -507,7 +507,7 @@ public class MockClient implements KafkaClient {
@Override
public ClientRequest newClientRequest(String nodeId, AbstractRequest.Builder<?> requestBuilder, long createdTimeMs,
boolean expectResponse) {
- return newClientRequest(nodeId, requestBuilder, createdTimeMs, expectResponse, 5000, null, null, null);
+ return newClientRequest(nodeId, requestBuilder, createdTimeMs, expectResponse, 5000, null);
}
@Override
@@ -516,11 +516,9 @@ public class MockClient implements KafkaClient {
long createdTimeMs,
boolean expectResponse,
int requestTimeoutMs,
- String initialPrincipalName,
- String initialClientId,
RequestCompletionHandler callback) {
return new ClientRequest(nodeId, requestBuilder, correlation++, "mockClientId", createdTimeMs,
- expectResponse, requestTimeoutMs, initialPrincipalName, initialClientId, callback);
+ expectResponse, requestTimeoutMs, callback);
}
@Override
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index a8a1efc..896c237 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -152,20 +152,6 @@ public class NetworkClientTest {
}
@Test
- public void testIncludeInitialPrincipalNameAndClientIdInHeader() {
- MetadataRequest.Builder builder = new MetadataRequest.Builder(Collections.singletonList("test"), true);
- final String initialPrincipalName = "initial-principal";
- final String initialClientId = "initial-client";
-
- ClientRequest request = client.newClientRequest("5", builder, 0L, false,
- defaultRequestTimeoutMs, initialPrincipalName, initialClientId, null);
- RequestHeader header = request.makeHeader(builder.latestAllowedVersion());
-
- assertEquals(initialPrincipalName, header.initialPrincipalName());
- assertEquals(initialClientId, header.initialClientId());
- }
-
- @Test
public void testClose() {
client.ready(node, time.milliseconds());
awaitReady(client, node);
@@ -208,8 +194,8 @@ public class NetworkClientTest {
Collections.emptyMap(),
null);
TestCallbackHandler handler = new TestCallbackHandler();
- ClientRequest request = networkClient.newClientRequest(node.idString(), builder, time.milliseconds(), true,
- defaultRequestTimeoutMs, null, null, handler);
+ ClientRequest request = networkClient.newClientRequest(
+ node.idString(), builder, time.milliseconds(), true, defaultRequestTimeoutMs, handler);
networkClient.send(request, time.milliseconds());
networkClient.poll(1, time.milliseconds());
assertEquals(1, networkClient.inFlightRequestCount());
@@ -454,7 +440,7 @@ public class NetworkClientTest {
TestCallbackHandler handler = new TestCallbackHandler();
int requestTimeoutMs = defaultRequestTimeoutMs + 5000;
ClientRequest request = client.newClientRequest(node.idString(), builder, time.milliseconds(), true,
- requestTimeoutMs, null, null, handler);
+ requestTimeoutMs, handler);
assertEquals(requestTimeoutMs, request.requestTimeoutMs());
testRequestTimeout(request);
}
@@ -522,7 +508,7 @@ public class NetworkClientTest {
null);
TestCallbackHandler handler = new TestCallbackHandler();
ClientRequest request = client.newClientRequest(node.idString(), builder, time.milliseconds(), true,
- defaultRequestTimeoutMs, null, null, handler);
+ defaultRequestTimeoutMs, handler);
client.send(request, time.milliseconds());
client.poll(1, time.milliseconds());
ResponseHeader respHeader =
@@ -608,7 +594,7 @@ public class NetworkClientTest {
Collections.emptyMap());
TestCallbackHandler handler = new TestCallbackHandler();
ClientRequest request = client.newClientRequest(nodeId, builder, time.milliseconds(), true,
- defaultRequestTimeoutMs, null, null, handler);
+ defaultRequestTimeoutMs, handler);
client.send(request, time.milliseconds());
return request.correlationId();
}
@@ -889,11 +875,11 @@ public class NetworkClientTest {
}
};
- ClientRequest request1 = client.newClientRequest(node.idString(), builder, now, true, defaultRequestTimeoutMs, null, null, callback);
+ ClientRequest request1 = client.newClientRequest(node.idString(), builder, now, true, defaultRequestTimeoutMs, callback);
client.send(request1, now);
client.poll(0, now);
- ClientRequest request2 = client.newClientRequest(node.idString(), builder, now, true, defaultRequestTimeoutMs, null, null, callback);
+ ClientRequest request2 = client.newClientRequest(node.idString(), builder, now, true, defaultRequestTimeoutMs, callback);
client.send(request2, now);
client.poll(0, now);
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 771f699..10deb42 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -2299,7 +2299,7 @@ public class SenderTest {
InOrder inOrder = inOrder(client);
inOrder.verify(client, atLeastOnce()).ready(any(), anyLong());
inOrder.verify(client, atLeastOnce()).newClientRequest(anyString(), any(), anyLong(), anyBoolean(), anyInt(),
- any(), any(), any());
+ any());
inOrder.verify(client, atLeastOnce()).send(any(), anyLong());
inOrder.verify(client).poll(eq(0L), anyLong());
inOrder.verify(client).poll(eq(accumulator.getDeliveryTimeoutMs()), anyLong());
@@ -2664,7 +2664,7 @@ public class SenderTest {
private TransactionManager createTransactionManager() {
return new TransactionManager(new LogContext(), null, 0, 100L, new ApiVersions(), false);
}
-
+
private void setupWithTransactionState(TransactionManager transactionManager) {
setupWithTransactionState(transactionManager, false, null, true);
}
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java
index 7e0901f..04d7726 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java
@@ -29,7 +29,6 @@ import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.junit.Test;
import java.net.InetAddress;
-import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import static org.junit.Assert.assertEquals;
@@ -43,7 +42,7 @@ public class RequestContextTest {
RequestHeader header = new RequestHeader(ApiKeys.API_VERSIONS, Short.MAX_VALUE, "", correlationId);
RequestContext context = new RequestContext(header, "0", InetAddress.getLocalHost(), KafkaPrincipal.ANONYMOUS,
- new ListenerName("ssl"), SecurityProtocol.SASL_SSL, ClientInformation.EMPTY, false);
+ new ListenerName("ssl"), SecurityProtocol.SASL_SSL, ClientInformation.EMPTY);
assertEquals(0, context.apiVersion());
// Write some garbage to the request buffer. This should be ignored since we will treat
@@ -80,13 +79,4 @@ public class RequestContextTest {
assertTrue(response.data.apiKeys().isEmpty());
}
- @Test
- public void testInitialPrincipalName() throws UnknownHostException {
- final String initialPrincipalName = "initial-principal";
- RequestHeader header = new RequestHeader(ApiKeys.API_VERSIONS, Short.MAX_VALUE, "", 1, initialPrincipalName, null);
- RequestContext context = new RequestContext(header, "0", InetAddress.getLocalHost(), KafkaPrincipal.ANONYMOUS,
- new ListenerName("ssl"), SecurityProtocol.SASL_SSL, ClientInformation.EMPTY, false);
-
- assertEquals(initialPrincipalName, context.initialPrincipalName());
- }
}
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestHeaderTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestHeaderTest.java
index cc8a72d..2842ae8 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestHeaderTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestHeaderTest.java
@@ -73,17 +73,4 @@ public class RequestHeaderTest {
RequestHeader deserialized = RequestHeader.parse(buffer);
assertEquals(header, deserialized);
}
-
- @Test
- public void testRequestHeaderWithInitialPrincipalAndClientId() {
- final String initialPrincipalName = "initial-principal";
- final String initialClientId = "initial-client";
- RequestHeader header = new RequestHeader(ApiKeys.CREATE_DELEGATION_TOKEN, (short) 2, "", 10,
- initialPrincipalName, initialClientId);
- assertEquals(2, header.headerVersion());
- ByteBuffer buffer = toBuffer(header.toStruct());
- assertEquals(17 + initialPrincipalName.length() + initialClientId.length(), buffer.remaining());
- RequestHeader deserialized = RequestHeader.parse(buffer);
- assertEquals(header, deserialized);
- }
}
diff --git a/core/src/main/scala/kafka/common/InterBrokerSendThread.scala b/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
index a9048a0..11e1aa8 100644
--- a/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
+++ b/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
@@ -63,8 +63,6 @@ abstract class InterBrokerSendThread(name: String,
now,
true,
requestTimeoutMs,
- null,
- null,
completionHandler))
}
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index dac2882..7b816c4 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -261,8 +261,7 @@ class SocketServer(val config: KafkaConfig,
endpointOpt.foreach { endpoint =>
connectionQuotas.addListener(config, endpoint.listenerName)
val controlPlaneAcceptor = createAcceptor(endpoint, ControlPlaneMetricPrefix)
- val controlPlaneProcessor = newProcessor(nextProcessorId, controlPlaneRequestChannelOpt.get,
- connectionQuotas, endpoint.listenerName, endpoint.securityProtocol, memoryPool, isPrivilegedListener = true)
+ val controlPlaneProcessor = newProcessor(nextProcessorId, controlPlaneRequestChannelOpt.get, connectionQuotas, endpoint.listenerName, endpoint.securityProtocol, memoryPool)
controlPlaneAcceptorOpt = Some(controlPlaneAcceptor)
controlPlaneProcessorOpt = Some(controlPlaneProcessor)
val listenerProcessors = new ArrayBuffer[Processor]()
@@ -285,11 +284,8 @@ class SocketServer(val config: KafkaConfig,
val listenerName = endpoint.listenerName
val securityProtocol = endpoint.securityProtocol
val listenerProcessors = new ArrayBuffer[Processor]()
- val isPrivilegedListener = controlPlaneRequestChannelOpt.isEmpty && config.interBrokerListenerName == listenerName
-
for (_ <- 0 until newProcessorsPerListener) {
- val processor = newProcessor(nextProcessorId, dataPlaneRequestChannel, connectionQuotas,
- listenerName, securityProtocol, memoryPool, isPrivilegedListener)
+ val processor = newProcessor(nextProcessorId, dataPlaneRequestChannel, connectionQuotas, listenerName, securityProtocol, memoryPool)
listenerProcessors += processor
dataPlaneRequestChannel.addProcessor(processor)
nextProcessorId += 1
@@ -415,7 +411,7 @@ class SocketServer(val config: KafkaConfig,
// `protected` for test usage
protected[network] def newProcessor(id: Int, requestChannel: RequestChannel, connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
- securityProtocol: SecurityProtocol, memoryPool: MemoryPool, isPrivilegedListener: Boolean): Processor = {
+ securityProtocol: SecurityProtocol, memoryPool: MemoryPool): Processor = {
new Processor(id,
time,
config.socketRequestMaxBytes,
@@ -430,7 +426,6 @@ class SocketServer(val config: KafkaConfig,
credentialProvider,
memoryPool,
logContext,
- isPrivilegedListener = isPrivilegedListener
)
}
@@ -655,8 +650,8 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
}
/**
- * Create a server socket to listen for connections on.
- */
+ * Create a server socket to listen for connections on.
+ */
private def openServerSocket(host: String, port: Int): ServerSocketChannel = {
val socketAddress =
if (host == null || host.trim.isEmpty)
@@ -730,12 +725,6 @@ private[kafka] object Processor {
/**
* Thread that processes all requests from a single connection. There are N of these running in parallel
* each of which has its own selector
- *
- * @param isPrivilegedListener The privileged listener flag is used as one factor to determine whether
- * a certain request is forwarded or not. When the control plane is defined,
- * the control plane processor would be fellow broker's choice for sending
- * forwarding requests; if the control plane is not defined, the processor
- * relying on the inter broker listener would be acting as the privileged listener.
*/
private[kafka] class Processor(val id: Int,
time: Time,
@@ -751,8 +740,7 @@ private[kafka] class Processor(val id: Int,
credentialProvider: CredentialProvider,
memoryPool: MemoryPool,
logContext: LogContext,
- connectionQueueSize: Int = ConnectionQueueSize,
- isPrivilegedListener: Boolean = false) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
+ connectionQueueSize: Int = ConnectionQueueSize) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
private object ConnectionId {
def fromString(s: String): Option[ConnectionId] = s.split("-") match {
@@ -781,7 +769,7 @@ private[kafka] class Processor(val id: Int,
newGauge(IdlePercentMetricName, () => {
Option(metrics.metric(metrics.metricName("io-wait-ratio", MetricsGroup, metricTags))).fold(0.0)(m =>
Math.min(m.metricValue.asInstanceOf[Double], 1.0))
- },
+ },
// for compatibility, only add a networkProcessor tag to the Yammer Metrics alias (the equivalent Selector metric
// also includes the listener name)
Map(NetworkProcessorMetricTag -> id.toString)
@@ -969,7 +957,7 @@ private[kafka] class Processor(val id: Int,
val connectionId = receive.source
val context = new RequestContext(header, connectionId, channel.socketAddress,
channel.principal, listenerName, securityProtocol,
- channel.channelMetadataRegistry.clientInformation, isPrivilegedListener)
+ channel.channelMetadataRegistry.clientInformation)
val req = new RequestChannel.Request(processor = id, context = context,
startTimeNanos = nowNanos, memoryPool, receive.payload, requestChannel.metrics)
// KIP-511: ApiVersionsRequest is intercepted here to catch the client software name
diff --git a/core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala b/core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala
index cac21a4..4637433 100644
--- a/core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala
+++ b/core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala
@@ -107,7 +107,7 @@ class KafkaNetworkChannel(time: Time,
val correlationId = req.correlationId
val createdTimeMs = req.createdTimeMs
new ClientRequest(destination, request, correlationId, clientId, createdTimeMs, true,
- requestTimeoutMs, null, null, null)
+ requestTimeoutMs, null)
}
override def send(message: RaftMessage): Unit = {
diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala b/core/src/main/scala/kafka/tools/TestRaftServer.scala
index 85444b4..a99bc6e 100644
--- a/core/src/main/scala/kafka/tools/TestRaftServer.scala
+++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala
@@ -317,7 +317,6 @@ class RaftSocketServer(
listenerName: ListenerName,
securityProtocol: SecurityProtocol,
memoryPool: MemoryPool,
- isPrivilegedListener: Boolean
): Processor = {
new Processor(id,
time,
@@ -332,9 +331,7 @@ class RaftSocketServer(
metrics,
credentialProvider,
memoryPool,
- logContext,
- isPrivilegedListener = isPrivilegedListener
- ) {
+ logContext) {
// We extend this API to skip the check for only enabled APIs. This
// gets us access to Vote, BeginQuorumEpoch, etc. which are not usable
// from the Kafka broker yet.
diff --git a/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala b/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala
index 5f8a8ee..f5110bf 100644
--- a/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala
+++ b/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala
@@ -64,7 +64,7 @@ class InterBrokerSendThreadTest {
override def generateRequests() = List[RequestAndCompletionHandler](handler)
}
- val clientRequest = new ClientRequest("dest", request, 0, "1", 0, true, requestTimeoutMs, null, null, handler.handler)
+ val clientRequest = new ClientRequest("dest", request, 0, "1", 0, true, requestTimeoutMs, handler.handler)
EasyMock.expect(networkClient.newClientRequest(
EasyMock.eq("1"),
@@ -72,8 +72,6 @@ class InterBrokerSendThreadTest {
EasyMock.anyLong(),
EasyMock.eq(true),
EasyMock.eq(requestTimeoutMs),
- EasyMock.eq(null),
- EasyMock.eq(null),
EasyMock.same(handler.handler)))
.andReturn(clientRequest)
@@ -103,7 +101,7 @@ class InterBrokerSendThreadTest {
override def generateRequests() = List[RequestAndCompletionHandler](requestAndCompletionHandler)
}
- val clientRequest = new ClientRequest("dest", request, 0, "1", 0, true, requestTimeoutMs, null, null, requestAndCompletionHandler.handler)
+ val clientRequest = new ClientRequest("dest", request, 0, "1", 0, true, requestTimeoutMs, requestAndCompletionHandler.handler)
EasyMock.expect(networkClient.newClientRequest(
EasyMock.eq("1"),
@@ -111,8 +109,6 @@ class InterBrokerSendThreadTest {
EasyMock.anyLong(),
EasyMock.eq(true),
EasyMock.eq(requestTimeoutMs),
- EasyMock.eq(null),
- EasyMock.eq(null),
EasyMock.same(requestAndCompletionHandler.handler)))
.andReturn(clientRequest)
@@ -156,8 +152,6 @@ class InterBrokerSendThreadTest {
time.milliseconds(),
true,
requestTimeoutMs,
- null,
- null,
handler.handler)
time.sleep(1500)
@@ -167,8 +161,6 @@ class InterBrokerSendThreadTest {
EasyMock.eq(time.milliseconds()),
EasyMock.eq(true),
EasyMock.eq(requestTimeoutMs),
- EasyMock.eq(null),
- EasyMock.eq(null),
EasyMock.same(handler.handler)))
.andReturn(clientRequest)
diff --git a/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala b/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
index d9740ba..ce19491 100644
--- a/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
+++ b/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
@@ -182,8 +182,7 @@ class RequestChannelTest {
new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user"),
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
SecurityProtocol.PLAINTEXT,
- new ClientInformation("name", "version"),
- false)
+ new ClientInformation("name", "version"))
}
private def toMap(config: Config): Map[String, String] = {
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 8190fb0..da18e9e 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -88,8 +88,6 @@ class SocketServerTest {
// Run the tests with TRACE logging to exercise request logging path
logLevelToRestore = kafkaLogger.getLevel
kafkaLogger.setLevel(Level.TRACE)
-
- assertTrue(server.controlPlaneRequestChannelOpt.isEmpty)
}
@After
@@ -304,7 +302,6 @@ class SocketServerTest {
val config = KafkaConfig.fromProps(testProps)
val testableServer = new TestableSocketServer(config)
testableServer.startup(startProcessingRequests = false)
-
val updatedEndPoints = config.advertisedListeners.map { endpoint =>
endpoint.copy(port = testableServer.boundPort(endpoint.listenerName))
}.map(_.toJava)
@@ -521,10 +518,10 @@ class SocketServerTest {
val overrideConnectionId = "127.0.0.1:1-127.0.0.1:2-0"
val overrideServer = new SocketServer(KafkaConfig.fromProps(props), serverMetrics, time, credentialProvider) {
override def newProcessor(id: Int, requestChannel: RequestChannel, connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
- protocol: SecurityProtocol, memoryPool: MemoryPool, isPrivilegedListener: Boolean = false): Processor = {
+ protocol: SecurityProtocol, memoryPool: MemoryPool): Processor = {
new Processor(id, time, config.socketRequestMaxBytes, dataPlaneRequestChannel, connectionQuotas,
config.connectionsMaxIdleMs, config.failedAuthenticationDelayMs, listenerName, protocol, config, metrics,
- credentialProvider, memoryPool, new LogContext(), isPrivilegedListener = isPrivilegedListener) {
+ credentialProvider, memoryPool, new LogContext()) {
override protected[network] def connectionId(socket: Socket): String = overrideConnectionId
override protected[network] def createSelector(channelBuilder: ChannelBuilder): Selector = {
val testableSelector = new TestableSelector(config, channelBuilder, time, metrics)
@@ -1021,10 +1018,10 @@ class SocketServerTest {
var conn: Socket = null
val overrideServer = new SocketServer(KafkaConfig.fromProps(props), serverMetrics, Time.SYSTEM, credentialProvider) {
override def newProcessor(id: Int, requestChannel: RequestChannel, connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
- protocol: SecurityProtocol, memoryPool: MemoryPool, isPrivilegedListener: Boolean = false): Processor = {
+ protocol: SecurityProtocol, memoryPool: MemoryPool): Processor = {
new Processor(id, time, config.socketRequestMaxBytes, dataPlaneRequestChannel, connectionQuotas,
config.connectionsMaxIdleMs, config.failedAuthenticationDelayMs, listenerName, protocol, config, metrics,
- credentialProvider, MemoryPool.NONE, new LogContext(), isPrivilegedListener = isPrivilegedListener) {
+ credentialProvider, MemoryPool.NONE, new LogContext()) {
override protected[network] def sendResponse(response: RequestChannel.Response, responseSend: Send): Unit = {
conn.close()
super.sendResponse(response, responseSend)
@@ -1061,10 +1058,10 @@ class SocketServerTest {
@volatile var selector: TestableSelector = null
val overrideServer = new SocketServer(KafkaConfig.fromProps(props), serverMetrics, Time.SYSTEM, credentialProvider) {
override def newProcessor(id: Int, requestChannel: RequestChannel, connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
- protocol: SecurityProtocol, memoryPool: MemoryPool, isPrivilegedListener: Boolean = false): Processor = {
+ protocol: SecurityProtocol, memoryPool: MemoryPool): Processor = {
new Processor(id, time, config.socketRequestMaxBytes, dataPlaneRequestChannel, connectionQuotas,
config.connectionsMaxIdleMs, config.failedAuthenticationDelayMs, listenerName, protocol, config, metrics,
- credentialProvider, memoryPool, new LogContext(), isPrivilegedListener = isPrivilegedListener) {
+ credentialProvider, memoryPool, new LogContext()) {
override protected[network] def createSelector(channelBuilder: ChannelBuilder): Selector = {
val testableSelector = new TestableSelector(config, channelBuilder, time, metrics)
selector = testableSelector
@@ -1476,9 +1473,6 @@ class SocketServerTest {
props ++= sslServerProps
val testableServer = new TestableSocketServer(time = time)
testableServer.startup()
-
- assertTrue(testableServer.controlPlaneRequestChannelOpt.isEmpty)
-
val proxyServer = new ProxyServer(testableServer)
try {
val testableSelector = testableServer.testableSelector
@@ -1676,75 +1670,6 @@ class SocketServerTest {
}
}
-
- @Test
- def testControlPlaneAsPrivilegedListener(): Unit = {
- val testProps = new Properties
- testProps ++= props
- testProps.put("listeners", "PLAINTEXT://localhost:0,CONTROLLER://localhost:0")
- testProps.put("listener.security.protocol.map", "PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT")
- testProps.put("control.plane.listener.name", "CONTROLLER")
- val config = KafkaConfig.fromProps(testProps)
- withTestableServer(config, { testableServer =>
- val controlPlaneSocket = connect(testableServer, config.controlPlaneListenerName.get,
- localAddr = InetAddress.getLocalHost)
- val sentRequest = sendAndReceiveControllerRequest(controlPlaneSocket, testableServer)
- assertTrue(sentRequest.context.fromPrivilegedListener)
-
- val plainSocket = connect(testableServer, localAddr = InetAddress.getLocalHost)
- val plainRequest = sendAndReceiveRequest(plainSocket, testableServer)
- assertFalse(plainRequest.context.fromPrivilegedListener)
- })
- }
-
- @Test
- def testInterBrokerListenerAsPrivilegedListener(): Unit = {
- val testProps = new Properties
- testProps ++= props
- testProps.put("listeners", "EXTERNAL://localhost:0,INTERNAL://localhost:0")
- testProps.put("listener.security.protocol.map", "EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT")
- testProps.put("inter.broker.listener.name", "INTERNAL")
- val config = KafkaConfig.fromProps(testProps)
- withTestableServer(config, { testableServer =>
- val interBrokerSocket = connect(testableServer, config.interBrokerListenerName,
- localAddr = InetAddress.getLocalHost)
- val sentRequest = sendAndReceiveRequest(interBrokerSocket, testableServer)
- assertTrue(sentRequest.context.fromPrivilegedListener)
-
- val externalSocket = connect(testableServer, new ListenerName("EXTERNAL"),
- localAddr = InetAddress.getLocalHost)
- val externalRequest = sendAndReceiveRequest(externalSocket, testableServer)
- assertFalse(externalRequest.context.fromPrivilegedListener)
- })
- }
-
- @Test
- def testControlPlaneTakePrecedenceOverInterBrokerListenerAsPrivilegedListener(): Unit = {
- val testProps = new Properties
- testProps ++= props
- testProps.put("listeners", "EXTERNAL://localhost:0,INTERNAL://localhost:0,CONTROLLER://localhost:0")
- testProps.put("listener.security.protocol.map", "EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT")
- testProps.put("control.plane.listener.name", "CONTROLLER")
- testProps.put("inter.broker.listener.name", "INTERNAL")
- val config = KafkaConfig.fromProps(testProps)
- withTestableServer(config, { testableServer =>
- val controlPlaneSocket = connect(testableServer, config.controlPlaneListenerName.get,
- localAddr = InetAddress.getLocalHost)
- val controlPlaneRequest = sendAndReceiveControllerRequest(controlPlaneSocket, testableServer)
- assertTrue(controlPlaneRequest.context.fromPrivilegedListener)
-
- val interBrokerSocket = connect(testableServer, config.interBrokerListenerName,
- localAddr = InetAddress.getLocalHost)
- val interBrokerRequest = sendAndReceiveRequest(interBrokerSocket, testableServer)
- assertFalse(interBrokerRequest.context.fromPrivilegedListener)
-
- val externalSocket = connect(testableServer, new ListenerName("EXTERNAL"),
- localAddr = InetAddress.getLocalHost)
- val externalRequest = sendAndReceiveRequest(externalSocket, testableServer)
- assertFalse(externalRequest.context.fromPrivilegedListener)
- })
- }
-
private def sslServerProps: Properties = {
val trustStoreFile = File.createTempFile("truststore", ".jks")
val sslProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, interBrokerSecurityProtocol = Some(SecurityProtocol.SSL),
@@ -1820,10 +1745,10 @@ class SocketServerTest {
@volatile var uncaughtExceptions = 0
override def newProcessor(id: Int, requestChannel: RequestChannel, connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
- protocol: SecurityProtocol, memoryPool: MemoryPool, isPrivilegedListener: Boolean = false): Processor = {
+ protocol: SecurityProtocol, memoryPool: MemoryPool): Processor = {
new Processor(id, time, config.socketRequestMaxBytes, requestChannel, connectionQuotas, config.connectionsMaxIdleMs,
config.failedAuthenticationDelayMs, listenerName, protocol, config, metrics, credentialProvider,
- memoryPool, new LogContext(), connectionQueueSize, isPrivilegedListener) {
+ memoryPool, new LogContext(), connectionQueueSize) {
override protected[network] def createSelector(channelBuilder: ChannelBuilder): Selector = {
val testableSelector = new TestableSelector(config, channelBuilder, time, metrics, metricTags.asScala)
diff --git a/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala
index a5c57b6..163b1fb 100644
--- a/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala
@@ -1037,7 +1037,7 @@ class AclAuthorizerTest extends ZooKeeperTestHarness {
val securityProtocol = SecurityProtocol.SASL_PLAINTEXT
val header = new RequestHeader(apiKey, 2, "", 1) //ApiKeys apiKey, short version, String clientId, int correlation
new RequestContext(header, "", clientAddress, principal, ListenerName.forSecurityProtocol(securityProtocol),
- securityProtocol, ClientInformation.EMPTY, false)
+ securityProtocol, ClientInformation.EMPTY)
}
private def authorize(authorizer: AclAuthorizer, requestContext: RequestContext, operation: AclOperation, resource: ResourcePattern): Boolean = {
diff --git a/core/src/test/scala/unit/kafka/server/BaseClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/BaseClientQuotaManagerTest.scala
index 1851306..6d4b5be 100644
--- a/core/src/test/scala/unit/kafka/server/BaseClientQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BaseClientQuotaManagerTest.scala
@@ -60,7 +60,7 @@ class BaseClientQuotaManagerTest {
}
protected def buildRequest[T <: AbstractRequest](builder: AbstractRequest.Builder[T],
- listenerName: ListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)): (T, RequestChannel.Request) = {
+ listenerName: ListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)): (T, RequestChannel.Request) = {
val request = builder.build()
val buffer = request.serialize(new RequestHeader(builder.apiKey, request.version, "", 0))
@@ -69,7 +69,7 @@ class BaseClientQuotaManagerTest {
// read the header from the buffer first so that the body can be read next from the Request constructor
val header = RequestHeader.parse(buffer)
val context = new RequestContext(header, "1", InetAddress.getLocalHost, KafkaPrincipal.ANONYMOUS,
- listenerName, SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false)
+ listenerName, SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY)
(request, new RequestChannel.Request(processor = 1, context = context, startTimeNanos = 0, MemoryPool.NONE, buffer,
requestChannelMetrics))
}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 90be674..e4136ed 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -153,7 +153,7 @@ class KafkaApisTest {
clientId, 0)
val requestContext = new RequestContext(requestHeader, "1", InetAddress.getLocalHost,
KafkaPrincipal.ANONYMOUS, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
- SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false)
+ SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY)
val expectedActions = Seq(
new Action(operation, new ResourcePattern(resourceType, resourceName, PatternType.LITERAL),
@@ -187,7 +187,7 @@ class KafkaApisTest {
clientId, 0)
val requestContext = new RequestContext(requestHeader, "1", InetAddress.getLocalHost,
KafkaPrincipal.ANONYMOUS, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
- SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false)
+ SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY)
val expectedActions = Seq(
new Action(operation, new ResourcePattern(resourceType, resourceName1, PatternType.LITERAL),
@@ -2260,7 +2260,7 @@ class KafkaApisTest {
// read the header from the buffer first so that the body can be read next from the Request constructor
val header = RequestHeader.parse(buffer)
val context = new RequestContext(header, "1", InetAddress.getLocalHost, KafkaPrincipal.ANONYMOUS,
- listenerName, SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false)
+ listenerName, SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY)
new RequestChannel.Request(processor = 1, context = context, startTimeNanos = 0, MemoryPool.NONE, buffer,
requestChannelMetrics)
}
diff --git a/core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala
index ff33084..bffbeec 100644
--- a/core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala
@@ -56,7 +56,7 @@ class ThrottledChannelExpirationTest {
// read the header from the buffer first so that the body can be read next from the Request constructor
val header = RequestHeader.parse(buffer)
val context = new RequestContext(header, "1", InetAddress.getLocalHost, KafkaPrincipal.ANONYMOUS,
- listenerName, SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false)
+ listenerName, SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY)
(request, new RequestChannel.Request(processor = 1, context = context, startTimeNanos = 0, MemoryPool.NONE, buffer,
requestChannelMetrics))
}
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java
index 060ff3a..12ac243 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java
@@ -96,7 +96,7 @@ public class AclAuthorizerBenchmark {
1, true, true));
context = new RequestContext(new RequestHeader(ApiKeys.PRODUCE, Integer.valueOf(1).shortValue(),
"someclient", 1), "1", InetAddress.getLocalHost(), KafkaPrincipal.ANONYMOUS,
- ListenerName.normalised("listener"), SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false);
+ ListenerName.normalised("listener"), SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY);
}
private void setFieldValue(Object obj, String fieldName, Object value) throws Exception {
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
index e2242c1..95c0e50 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
@@ -202,8 +202,7 @@ public class MetadataRequestBenchmark {
RequestHeader header = RequestHeader.parse(buffer);
RequestContext context = new RequestContext(header, "1", null, principal,
- ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
- SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false);
+ ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY);
return new RequestChannel.Request(1, context, 0, MemoryPool.NONE, buffer, requestChannelMetrics);
}