You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ma...@apache.org on 2019/06/25 13:01:28 UTC
[kafka] branch trunk updated: KAFKA-8390: Use automatic RPC
generation in CreateDelegationToken (#6828)
This is an automated email from the ASF dual-hosted git repository.
manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 14d8549 KAFKA-8390: Use automatic RPC generation in CreateDelegationToken (#6828)
14d8549 is described below
commit 14d854936e1d2fed2e69a7c6367becf360f88833
Author: Mickael Maison <mi...@users.noreply.github.com>
AuthorDate: Tue Jun 25 15:01:11 2019 +0200
KAFKA-8390: Use automatic RPC generation in CreateDelegationToken (#6828)
Reviewers: Manikumar Reddy <ma...@gmail.com>
---
.../kafka/clients/admin/KafkaAdminClient.java | 24 ++-
.../org/apache/kafka/common/protocol/ApiKeys.java | 6 +-
.../kafka/common/requests/AbstractResponse.java | 2 +-
.../requests/CreateDelegationTokenRequest.java | 94 ++----------
.../requests/CreateDelegationTokenResponse.java | 164 ++++++---------------
.../kafka/common/requests/RequestResponseTest.java | 30 +++-
core/src/main/scala/kafka/server/KafkaApis.scala | 15 +-
.../scala/unit/kafka/server/RequestQuotaTest.scala | 11 +-
8 files changed, 122 insertions(+), 224 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index e01ecf3..3fde51d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -61,6 +61,9 @@ import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
+import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
@@ -138,6 +141,7 @@ import org.apache.kafka.common.requests.OffsetFetchRequest;
import org.apache.kafka.common.requests.OffsetFetchResponse;
import org.apache.kafka.common.requests.RenewDelegationTokenRequest;
import org.apache.kafka.common.requests.RenewDelegationTokenResponse;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.token.delegation.DelegationToken;
import org.apache.kafka.common.security.token.delegation.TokenInformation;
import org.apache.kafka.common.utils.AppInfoParser;
@@ -2389,12 +2393,21 @@ public class KafkaAdminClient extends AdminClient {
public CreateDelegationTokenResult createDelegationToken(final CreateDelegationTokenOptions options) {
final KafkaFutureImpl<DelegationToken> delegationTokenFuture = new KafkaFutureImpl<>();
final long now = time.milliseconds();
+ List<CreatableRenewers> renewers = new ArrayList<>();
+ for (KafkaPrincipal principal : options.renewers()) {
+ renewers.add(new CreatableRenewers()
+ .setPrincipalName(principal.getName())
+ .setPrincipalType(principal.getPrincipalType()));
+ }
runnable.call(new Call("createDelegationToken", calcDeadlineMs(now, options.timeoutMs()),
new LeastLoadedNodeProvider()) {
@Override
- AbstractRequest.Builder createRequest(int timeoutMs) {
- return new CreateDelegationTokenRequest.Builder(options.renewers(), options.maxlifeTimeMs());
+ AbstractRequest.Builder<CreateDelegationTokenRequest> createRequest(int timeoutMs) {
+ return new CreateDelegationTokenRequest.Builder(
+ new CreateDelegationTokenRequestData()
+ .setRenewers(renewers)
+ .setMaxLifetimeMs(options.maxlifeTimeMs()));
}
@Override
@@ -2403,9 +2416,10 @@ public class KafkaAdminClient extends AdminClient {
if (response.hasError()) {
delegationTokenFuture.completeExceptionally(response.error().exception());
} else {
- TokenInformation tokenInfo = new TokenInformation(response.tokenId(), response.owner(),
- options.renewers(), response.issueTimestamp(), response.maxTimestamp(), response.expiryTimestamp());
- DelegationToken token = new DelegationToken(tokenInfo, response.hmacBytes());
+ CreateDelegationTokenResponseData data = response.data();
+ TokenInformation tokenInfo = new TokenInformation(data.tokenId(), new KafkaPrincipal(data.principalType(), data.principalName()),
+ options.renewers(), data.issueTimestampMs(), data.maxTimestampMs(), data.expiryTimestampMs());
+ DelegationToken token = new DelegationToken(tokenInfo, data.hmac());
delegationTokenFuture.complete(token);
}
}
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 849f268..257393b 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -18,6 +18,8 @@ package org.apache.kafka.common.protocol;
import org.apache.kafka.common.message.ControlledShutdownRequestData;
import org.apache.kafka.common.message.ControlledShutdownResponseData;
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
+import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreateTopicsResponseData;
import org.apache.kafka.common.message.DeleteTopicsRequestData;
@@ -65,8 +67,6 @@ import org.apache.kafka.common.requests.ApiVersionsRequest;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.CreateAclsRequest;
import org.apache.kafka.common.requests.CreateAclsResponse;
-import org.apache.kafka.common.requests.CreateDelegationTokenRequest;
-import org.apache.kafka.common.requests.CreateDelegationTokenResponse;
import org.apache.kafka.common.requests.CreatePartitionsRequest;
import org.apache.kafka.common.requests.CreatePartitionsResponse;
import org.apache.kafka.common.requests.DeleteAclsRequest;
@@ -185,7 +185,7 @@ public enum ApiKeys {
SaslAuthenticateResponseData.SCHEMAS),
CREATE_PARTITIONS(37, "CreatePartitions", CreatePartitionsRequest.schemaVersions(),
CreatePartitionsResponse.schemaVersions()),
- CREATE_DELEGATION_TOKEN(38, "CreateDelegationToken", CreateDelegationTokenRequest.schemaVersions(), CreateDelegationTokenResponse.schemaVersions()),
+ CREATE_DELEGATION_TOKEN(38, "CreateDelegationToken", CreateDelegationTokenRequestData.SCHEMAS, CreateDelegationTokenResponseData.SCHEMAS),
RENEW_DELEGATION_TOKEN(39, "RenewDelegationToken", RenewDelegationTokenRequest.schemaVersions(), RenewDelegationTokenResponse.schemaVersions()),
EXPIRE_DELEGATION_TOKEN(40, "ExpireDelegationToken", ExpireDelegationTokenRequest.schemaVersions(), ExpireDelegationTokenResponse.schemaVersions()),
DESCRIBE_DELEGATION_TOKEN(41, "DescribeDelegationToken", DescribeDelegationTokenRequest.schemaVersions(), DescribeDelegationTokenResponse.schemaVersions()),
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 6d07431..8a7be33 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -147,7 +147,7 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
case CREATE_PARTITIONS:
return new CreatePartitionsResponse(struct);
case CREATE_DELEGATION_TOKEN:
- return new CreateDelegationTokenResponse(struct);
+ return new CreateDelegationTokenResponse(struct, version);
case RENEW_DELEGATION_TOKEN:
return new RenewDelegationTokenResponse(struct);
case EXPIRE_DELEGATION_TOKEN:
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenRequest.java
index 3277f10..61b404a 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenRequest.java
@@ -16,126 +16,62 @@
*/
package org.apache.kafka.common.requests;
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.apache.kafka.common.protocol.types.Type.INT64;
-import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL_TYPE;
-import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL_NAME;
public class CreateDelegationTokenRequest extends AbstractRequest {
- private static final String RENEWERS_KEY_NAME = "renewers";
- private static final String MAX_LIFE_TIME_KEY_NAME = "max_life_time";
-
- private static final Schema TOKEN_CREATE_REQUEST_V0 = new Schema(
- new Field(RENEWERS_KEY_NAME, new ArrayOf(new Schema(PRINCIPAL_TYPE, PRINCIPAL_NAME)),
- "An array of token renewers. Renewer is an Kafka PrincipalType and name string," +
- " who is allowed to renew this token before the max lifetime expires."),
- new Field(MAX_LIFE_TIME_KEY_NAME, INT64,
- "Max lifetime period for token in milli seconds. if value is -1, then max lifetime" +
- " will default to a server side config value."));
- /**
- * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
- */
- private static final Schema TOKEN_CREATE_REQUEST_V1 = TOKEN_CREATE_REQUEST_V0;
+ private final CreateDelegationTokenRequestData data;
- private final List<KafkaPrincipal> renewers;
- private final long maxLifeTime;
-
- private CreateDelegationTokenRequest(short version, List<KafkaPrincipal> renewers, long maxLifeTime) {
+ private CreateDelegationTokenRequest(CreateDelegationTokenRequestData data, short version) {
super(ApiKeys.CREATE_DELEGATION_TOKEN, version);
- this.maxLifeTime = maxLifeTime;
- this.renewers = renewers;
+ this.data = data;
}
public CreateDelegationTokenRequest(Struct struct, short version) {
super(ApiKeys.CREATE_DELEGATION_TOKEN, version);
- maxLifeTime = struct.getLong(MAX_LIFE_TIME_KEY_NAME);
- Object[] renewerArray = struct.getArray(RENEWERS_KEY_NAME);
- renewers = new ArrayList<>();
- if (renewerArray != null) {
- for (Object renewerObj : renewerArray) {
- Struct renewerObjStruct = (Struct) renewerObj;
- String principalType = renewerObjStruct.get(PRINCIPAL_TYPE);
- String principalName = renewerObjStruct.get(PRINCIPAL_NAME);
- renewers.add(new KafkaPrincipal(principalType, principalName));
- }
- }
+ this.data = new CreateDelegationTokenRequestData(struct, version);
}
public static CreateDelegationTokenRequest parse(ByteBuffer buffer, short version) {
return new CreateDelegationTokenRequest(ApiKeys.CREATE_DELEGATION_TOKEN.parseRequest(version, buffer), version);
}
- public static Schema[] schemaVersions() {
- return new Schema[]{TOKEN_CREATE_REQUEST_V0, TOKEN_CREATE_REQUEST_V1};
- }
-
@Override
protected Struct toStruct() {
- short version = version();
- Struct struct = new Struct(ApiKeys.CREATE_DELEGATION_TOKEN.requestSchema(version));
- Object[] renewersArray = new Object[renewers.size()];
-
- int i = 0;
- for (KafkaPrincipal principal: renewers) {
- Struct renewerStruct = struct.instance(RENEWERS_KEY_NAME);
- renewerStruct.set(PRINCIPAL_TYPE, principal.getPrincipalType());
- renewerStruct.set(PRINCIPAL_NAME, principal.getName());
- renewersArray[i++] = renewerStruct;
- }
+ return data.toStruct(version());
+ }
- struct.set(RENEWERS_KEY_NAME, renewersArray);
- struct.set(MAX_LIFE_TIME_KEY_NAME, maxLifeTime);
- return struct;
+ public CreateDelegationTokenRequestData data() {
+ return data;
}
@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
- return new CreateDelegationTokenResponse(throttleTimeMs, Errors.forException(e), KafkaPrincipal.ANONYMOUS);
- }
-
- public List<KafkaPrincipal> renewers() {
- return renewers;
- }
-
- public long maxLifeTime() {
- return maxLifeTime;
+ return CreateDelegationTokenResponse.prepareResponse(throttleTimeMs, Errors.forException(e), KafkaPrincipal.ANONYMOUS);
}
public static class Builder extends AbstractRequest.Builder<CreateDelegationTokenRequest> {
- private final List<KafkaPrincipal> renewers;
- private final long maxLifeTime;
+ private final CreateDelegationTokenRequestData data;
- public Builder(List<KafkaPrincipal> renewers, long maxLifeTime) {
+ public Builder(CreateDelegationTokenRequestData data) {
super(ApiKeys.CREATE_DELEGATION_TOKEN);
- this.renewers = renewers;
- this.maxLifeTime = maxLifeTime;
+ this.data = data;
}
@Override
public CreateDelegationTokenRequest build(short version) {
- return new CreateDelegationTokenRequest(version, renewers, maxLifeTime);
+ return new CreateDelegationTokenRequest(data, version);
}
@Override
public String toString() {
- StringBuilder bld = new StringBuilder();
- bld.append("(type: CreateDelegationTokenRequest").
- append(", renewers=").append(renewers).
- append(", maxLifeTime=").append(maxLifeTime).
- append(")");
- return bld.toString();
+ return data.toString();
}
}
}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenResponse.java
index 8bb6631..04db17a 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenResponse.java
@@ -16,160 +16,82 @@
*/
package org.apache.kafka.common.requests;
+import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import java.nio.ByteBuffer;
+import java.util.Collections;
import java.util.Map;
-import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
-import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL_NAME;
-import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL_TYPE;
-import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
-import static org.apache.kafka.common.protocol.types.Type.BYTES;
-import static org.apache.kafka.common.protocol.types.Type.INT64;
-import static org.apache.kafka.common.protocol.types.Type.STRING;
-
public class CreateDelegationTokenResponse extends AbstractResponse {
- private static final String OWNER_KEY_NAME = "owner";
- private static final String ISSUE_TIMESTAMP_KEY_NAME = "issue_timestamp";
- private static final String EXPIRY_TIMESTAMP_NAME = "expiry_timestamp";
- private static final String MAX_TIMESTAMP_NAME = "max_timestamp";
- private static final String TOKEN_ID_KEY_NAME = "token_id";
- private static final String HMAC_KEY_NAME = "hmac";
-
- private final Errors error;
- private final long issueTimestamp;
- private final long expiryTimestamp;
- private final long maxTimestamp;
- private final String tokenId;
- private final ByteBuffer hmac;
- private final int throttleTimeMs;
- private KafkaPrincipal owner;
-
- private static final Schema TOKEN_CREATE_RESPONSE_V0 = new Schema(
- ERROR_CODE,
- new Field(OWNER_KEY_NAME, new Schema(PRINCIPAL_TYPE, PRINCIPAL_NAME), "token owner."),
- new Field(ISSUE_TIMESTAMP_KEY_NAME, INT64, "timestamp (in msec) when this token was generated."),
- new Field(EXPIRY_TIMESTAMP_NAME, INT64, "timestamp (in msec) at which this token expires."),
- new Field(MAX_TIMESTAMP_NAME, INT64, "max life time of this token."),
- new Field(TOKEN_ID_KEY_NAME, STRING, "UUID to ensure uniqueness."),
- new Field(HMAC_KEY_NAME, BYTES, "HMAC of the delegation token."),
- THROTTLE_TIME_MS);
-
- /**
- * The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
- */
- private static final Schema TOKEN_CREATE_RESPONSE_V1 = TOKEN_CREATE_RESPONSE_V0;
-
- public CreateDelegationTokenResponse(int throttleTimeMs,
- Errors error,
- KafkaPrincipal owner,
- long issueTimestamp,
- long expiryTimestamp,
- long maxTimestamp,
- String tokenId,
- ByteBuffer hmac) {
- this.throttleTimeMs = throttleTimeMs;
- this.error = error;
- this.owner = owner;
- this.issueTimestamp = issueTimestamp;
- this.expiryTimestamp = expiryTimestamp;
- this.maxTimestamp = maxTimestamp;
- this.tokenId = tokenId;
- this.hmac = hmac;
- }
+ private final CreateDelegationTokenResponseData data;
- public CreateDelegationTokenResponse(int throttleTimeMs, Errors error, KafkaPrincipal owner) {
- this(throttleTimeMs, error, owner, -1, -1, -1, "", ByteBuffer.wrap(new byte[] {}));
+ public CreateDelegationTokenResponse(CreateDelegationTokenResponseData data) {
+ this.data = data;
}
- public CreateDelegationTokenResponse(Struct struct) {
- error = Errors.forCode(struct.get(ERROR_CODE));
- Struct ownerStruct = (Struct) struct.get(OWNER_KEY_NAME);
- String principalType = ownerStruct.get(PRINCIPAL_TYPE);
- String principalName = ownerStruct.get(PRINCIPAL_NAME);
- owner = new KafkaPrincipal(principalType, principalName);
- issueTimestamp = struct.getLong(ISSUE_TIMESTAMP_KEY_NAME);
- expiryTimestamp = struct.getLong(EXPIRY_TIMESTAMP_NAME);
- maxTimestamp = struct.getLong(MAX_TIMESTAMP_NAME);
- tokenId = struct.getString(TOKEN_ID_KEY_NAME);
- hmac = struct.getBytes(HMAC_KEY_NAME);
- this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
+ public CreateDelegationTokenResponse(Struct struct, short version) {
+ this.data = new CreateDelegationTokenResponseData(struct, version);
}
public static CreateDelegationTokenResponse parse(ByteBuffer buffer, short version) {
- return new CreateDelegationTokenResponse(ApiKeys.CREATE_DELEGATION_TOKEN.responseSchema(version).read(buffer));
- }
-
- public static Schema[] schemaVersions() {
- return new Schema[] {TOKEN_CREATE_RESPONSE_V0, TOKEN_CREATE_RESPONSE_V1};
- }
-
- @Override
- public Map<Errors, Integer> errorCounts() {
- return errorCounts(error);
- }
-
- @Override
- protected Struct toStruct(short version) {
- Struct struct = new Struct(ApiKeys.CREATE_DELEGATION_TOKEN.responseSchema(version));
- struct.set(ERROR_CODE, error.code());
- Struct ownerStruct = struct.instance(OWNER_KEY_NAME);
- ownerStruct.set(PRINCIPAL_TYPE, owner.getPrincipalType());
- ownerStruct.set(PRINCIPAL_NAME, owner.getName());
- struct.set(OWNER_KEY_NAME, ownerStruct);
- struct.set(ISSUE_TIMESTAMP_KEY_NAME, issueTimestamp);
- struct.set(EXPIRY_TIMESTAMP_NAME, expiryTimestamp);
- struct.set(MAX_TIMESTAMP_NAME, maxTimestamp);
- struct.set(TOKEN_ID_KEY_NAME, tokenId);
- struct.set(HMAC_KEY_NAME, hmac);
- struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
- return struct;
+ return new CreateDelegationTokenResponse(ApiKeys.CREATE_DELEGATION_TOKEN.responseSchema(version).read(buffer), version);
}
- public Errors error() {
- return error;
- }
-
- public KafkaPrincipal owner() {
- return owner;
+ public static CreateDelegationTokenResponse prepareResponse(int throttleTimeMs,
+ Errors error,
+ KafkaPrincipal owner,
+ long issueTimestamp,
+ long expiryTimestamp,
+ long maxTimestamp,
+ String tokenId,
+ ByteBuffer hmac) {
+ CreateDelegationTokenResponseData data = new CreateDelegationTokenResponseData()
+ .setThrottleTimeMs(throttleTimeMs)
+ .setErrorCode(error.code())
+ .setPrincipalType(owner.getPrincipalType())
+ .setPrincipalName(owner.getName())
+ .setIssueTimestampMs(issueTimestamp)
+ .setExpiryTimestampMs(expiryTimestamp)
+ .setMaxTimestampMs(maxTimestamp)
+ .setTokenId(tokenId)
+ .setHmac(hmac.array());
+ return new CreateDelegationTokenResponse(data);
}
- public long issueTimestamp() {
- return issueTimestamp;
+ public static CreateDelegationTokenResponse prepareResponse(int throttleTimeMs, Errors error, KafkaPrincipal owner) {
+ return prepareResponse(throttleTimeMs, error, owner, -1, -1, -1, "", ByteBuffer.wrap(new byte[] {}));
}
- public long expiryTimestamp() {
- return expiryTimestamp;
+ public CreateDelegationTokenResponseData data() {
+ return data;
}
- public long maxTimestamp() {
- return maxTimestamp;
- }
-
- public String tokenId() {
- return tokenId;
+ @Override
+ public Map<Errors, Integer> errorCounts() {
+ return Collections.singletonMap(error(), 1);
}
- public byte[] hmacBytes() {
- byte[] byteArray = new byte[hmac.remaining()];
- hmac.get(byteArray);
- return byteArray;
+ @Override
+ protected Struct toStruct(short version) {
+ return data.toStruct(version);
}
@Override
public int throttleTimeMs() {
- return throttleTimeMs;
+ return data.throttleTimeMs();
+ }
+
+ public Errors error() {
+ return Errors.forCode(data.errorCode());
}
public boolean hasError() {
- return this.error != Errors.NONE;
+ return error() != Errors.NONE;
}
@Override
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 11d85d6..1fb0880 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -37,6 +37,9 @@ import org.apache.kafka.common.message.ControlledShutdownRequestData;
import org.apache.kafka.common.message.ControlledShutdownResponseData.RemainingPartition;
import org.apache.kafka.common.message.ControlledShutdownResponseData.RemainingPartitionCollection;
import org.apache.kafka.common.message.ControlledShutdownResponseData;
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
+import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreateableTopicConfig;
@@ -1484,15 +1487,30 @@ public class RequestResponseTest {
}
private CreateDelegationTokenRequest createCreateTokenRequest() {
- List<KafkaPrincipal> renewers = new ArrayList<>();
- renewers.add(SecurityUtils.parseKafkaPrincipal("User:user1"));
- renewers.add(SecurityUtils.parseKafkaPrincipal("User:user2"));
- return new CreateDelegationTokenRequest.Builder(renewers, System.currentTimeMillis()).build();
+ List<CreatableRenewers> renewers = new ArrayList<>();
+ renewers.add(new CreatableRenewers()
+ .setPrincipalType("User")
+ .setPrincipalName("user1"));
+ renewers.add(new CreatableRenewers()
+ .setPrincipalType("User")
+ .setPrincipalName("user2"));
+ return new CreateDelegationTokenRequest.Builder(new CreateDelegationTokenRequestData()
+ .setRenewers(renewers)
+ .setMaxLifetimeMs(System.currentTimeMillis())).build();
}
private CreateDelegationTokenResponse createCreateTokenResponse() {
- return new CreateDelegationTokenResponse(20, Errors.NONE, SecurityUtils.parseKafkaPrincipal("User:user1"), System.currentTimeMillis(),
- System.currentTimeMillis(), System.currentTimeMillis(), "token1", ByteBuffer.wrap("test".getBytes()));
+ CreateDelegationTokenResponseData data = new CreateDelegationTokenResponseData()
+ .setThrottleTimeMs(20)
+ .setErrorCode(Errors.NONE.code())
+ .setPrincipalType("User")
+ .setPrincipalName("user1")
+ .setIssueTimestampMs(System.currentTimeMillis())
+ .setExpiryTimestampMs(System.currentTimeMillis())
+ .setMaxTimestampMs(System.currentTimeMillis())
+ .setTokenId("token1")
+ .setHmac("test".getBytes());
+ return new CreateDelegationTokenResponse(data);
}
private RenewDelegationTokenRequest createRenewTokenRequest() {
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index fa45cf5..e1000d0 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -2337,25 +2337,26 @@ class KafkaApis(val requestChannel: RequestChannel,
trace("Sending create token response for correlation id %d to client %s."
.format(request.header.correlationId, request.header.clientId))
sendResponseMaybeThrottle(request, requestThrottleMs =>
- new CreateDelegationTokenResponse(requestThrottleMs, createResult.error, request.session.principal, createResult.issueTimestamp,
+ CreateDelegationTokenResponse.prepareResponse(requestThrottleMs, createResult.error, request.session.principal, createResult.issueTimestamp,
createResult.expiryTimestamp, createResult.maxTimestamp, createResult.tokenId, ByteBuffer.wrap(createResult.hmac)))
}
if (!allowTokenRequests(request))
sendResponseMaybeThrottle(request, requestThrottleMs =>
- new CreateDelegationTokenResponse(requestThrottleMs, Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, request.session.principal))
+ CreateDelegationTokenResponse.prepareResponse(requestThrottleMs, Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, request.session.principal))
else {
- val renewerList = createTokenRequest.renewers().asScala.toList
+ val renewerList = createTokenRequest.data.renewers.asScala.toList.map(entry =>
+ new KafkaPrincipal(entry.principalType, entry.principalName))
- if (renewerList.exists(principal => principal.getPrincipalType != KafkaPrincipal.USER_TYPE)) {
+ if (renewerList.exists(principal => principal.getPrincipalType != KafkaPrincipal.USER_TYPE)) {
sendResponseMaybeThrottle(request, requestThrottleMs =>
- new CreateDelegationTokenResponse(requestThrottleMs, Errors.INVALID_PRINCIPAL_TYPE, request.session.principal))
+ CreateDelegationTokenResponse.prepareResponse(requestThrottleMs, Errors.INVALID_PRINCIPAL_TYPE, request.session.principal))
}
else {
tokenManager.createToken(
request.session.principal,
- createTokenRequest.renewers().asScala.toList,
- createTokenRequest.maxLifeTime(),
+ renewerList,
+ createTokenRequest.data.maxLifetimeMs,
sendResponseCallback
)
}
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index c0d9b44..493b3e3 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -29,6 +29,7 @@ import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.message.ControlledShutdownRequestData
import org.apache.kafka.common.message.CreateTopicsRequestData
import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicCollection}
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData
import org.apache.kafka.common.message.DeleteTopicsRequestData
import org.apache.kafka.common.message.DescribeGroupsRequestData
import org.apache.kafka.common.message.FindCoordinatorRequestData
@@ -429,7 +430,13 @@ class RequestQuotaTest extends BaseRequestTest {
)
case ApiKeys.CREATE_DELEGATION_TOKEN =>
- new CreateDelegationTokenRequest.Builder(Collections.singletonList(SecurityUtils.parseKafkaPrincipal("User:test")), 1000)
+ new CreateDelegationTokenRequest.Builder(
+ new CreateDelegationTokenRequestData()
+ .setRenewers(Collections.singletonList(new CreateDelegationTokenRequestData.CreatableRenewers()
+ .setPrincipalType("User")
+ .setPrincipalName("test")))
+ .setMaxLifetimeMs(1000)
+ )
case ApiKeys.EXPIRE_DELEGATION_TOKEN =>
new ExpireDelegationTokenRequest.Builder("".getBytes, 1000)
@@ -548,7 +555,7 @@ class RequestQuotaTest extends BaseRequestTest {
case ApiKeys.ALTER_REPLICA_LOG_DIRS => new AlterReplicaLogDirsResponse(response).throttleTimeMs
case ApiKeys.DESCRIBE_LOG_DIRS => new DescribeLogDirsResponse(response).throttleTimeMs
case ApiKeys.CREATE_PARTITIONS => new CreatePartitionsResponse(response).throttleTimeMs
- case ApiKeys.CREATE_DELEGATION_TOKEN => new CreateDelegationTokenResponse(response).throttleTimeMs
+ case ApiKeys.CREATE_DELEGATION_TOKEN => new CreateDelegationTokenResponse(response, ApiKeys.CREATE_DELEGATION_TOKEN.latestVersion).throttleTimeMs
case ApiKeys.DESCRIBE_DELEGATION_TOKEN=> new DescribeDelegationTokenResponse(response).throttleTimeMs
case ApiKeys.EXPIRE_DELEGATION_TOKEN => new ExpireDelegationTokenResponse(response).throttleTimeMs
case ApiKeys.RENEW_DELEGATION_TOKEN => new RenewDelegationTokenResponse(response).throttleTimeMs