You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/05/24 22:28:55 UTC
[5/5] kafka git commit: KAFKA-5259;
TransactionalId auth implies ProducerId auth
KAFKA-5259; TransactionalId auth implies ProducerId auth
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Apurva Mehta <ap...@confluent.io>, Jun Rao <ju...@gmail.com>
Closes #3075 from hachikuji/KAFKA-5259-FIXED
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/38f6cae9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/38f6cae9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/38f6cae9
Branch: refs/heads/trunk
Commit: 38f6cae9e879baa35c5dbc5829bf09ecd59930c2
Parents: 8820093
Author: Jason Gustafson <ja...@confluent.io>
Authored: Wed May 24 15:26:46 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Wed May 24 15:26:46 2017 -0700
----------------------------------------------------------------------
.../apache/kafka/clients/ClientResponse.java | 7 +-
.../kafka/clients/admin/AclOperation.java | 7 +-
.../clients/consumer/internals/Fetcher.java | 2 +-
.../kafka/clients/producer/KafkaProducer.java | 35 +-
.../clients/producer/internals/Sender.java | 169 +-
.../producer/internals/TransactionManager.java | 268 ++--
.../ProducerIdAuthorizationException.java | 23 -
.../TransactionalIdAuthorizationException.java | 2 +-
.../apache/kafka/common/protocol/Errors.java | 13 +-
.../apache/kafka/common/protocol/Protocol.java | 3 +
.../common/requests/AddOffsetsToTxnRequest.java | 11 +
.../requests/AddOffsetsToTxnResponse.java | 5 +-
.../requests/AddPartitionsToTxnRequest.java | 11 +
.../requests/AddPartitionsToTxnResponse.java | 3 +-
.../kafka/common/requests/EndTxnRequest.java | 11 +
.../kafka/common/requests/EndTxnResponse.java | 3 +-
.../common/requests/InitProducerIdResponse.java | 12 +-
.../kafka/common/requests/ProduceRequest.java | 5 +-
.../kafka/common/requests/ProduceResponse.java | 3 +
.../common/requests/TxnOffsetCommitRequest.java | 31 +-
.../requests/TxnOffsetCommitResponse.java | 1 +
.../requests/WriteTxnMarkersResponse.java | 1 +
.../kafka/clients/admin/AclOperationTest.java | 3 +-
.../clients/producer/internals/SenderTest.java | 183 +--
.../internals/TransactionManagerTest.java | 445 ++++--
.../common/requests/RequestResponseTest.java | 10 +-
.../src/main/scala/kafka/admin/AclCommand.scala | 54 +-
.../kafka/coordinator/group/GroupMetadata.scala | 43 +-
.../group/GroupMetadataManager.scala | 5 +-
.../coordinator/group/MemberMetadata.scala | 14 +-
.../scala/kafka/security/auth/Operation.scala | 6 +-
.../scala/kafka/security/auth/Resource.scala | 3 +-
.../kafka/security/auth/ResourceType.scala | 16 +-
.../src/main/scala/kafka/server/KafkaApis.scala | 316 ++--
.../kafka/api/AuthorizerIntegrationTest.scala | 292 +++-
.../kafka/api/TransactionsBounceTest.scala | 31 +-
.../kafka/api/TransactionsTest.scala | 7 +-
.../scala/unit/kafka/admin/AclCommandTest.scala | 36 +-
.../group/GroupCoordinatorResponseTest.scala | 1492 ------------------
.../group/GroupCoordinatorTest.scala | 1492 ++++++++++++++++++
.../coordinator/group/GroupMetadataTest.scala | 2 +-
.../unit/kafka/server/RequestQuotaTest.scala | 9 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 10 +-
43 files changed, 2771 insertions(+), 2324 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/38f6cae9/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java b/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java
index 715eae7..0ff30e9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.RequestHeader;
@@ -31,7 +32,7 @@ public class ClientResponse {
private final long receivedTimeMs;
private final long latencyMs;
private final boolean disconnected;
- private final RuntimeException versionMismatch;
+ private final UnsupportedVersionException versionMismatch;
private final AbstractResponse responseBody;
/**
@@ -51,7 +52,7 @@ public class ClientResponse {
long createdTimeMs,
long receivedTimeMs,
boolean disconnected,
- RuntimeException versionMismatch,
+ UnsupportedVersionException versionMismatch,
AbstractResponse responseBody) {
this.requestHeader = requestHeader;
this.callback = callback;
@@ -71,7 +72,7 @@ public class ClientResponse {
return disconnected;
}
- public RuntimeException versionMismatch() {
+ public UnsupportedVersionException versionMismatch() {
return versionMismatch;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/38f6cae9/clients/src/main/java/org/apache/kafka/clients/admin/AclOperation.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AclOperation.java b/clients/src/main/java/org/apache/kafka/clients/admin/AclOperation.java
index 062e5e3..0c3ff50 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AclOperation.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AclOperation.java
@@ -83,7 +83,12 @@ public enum AclOperation {
/**
* ALTER_CONFIGS operation.
*/
- ALTER_CONFIGS((byte) 11);
+ ALTER_CONFIGS((byte) 11),
+
+ /**
+ * IDEMPOTENT_WRITE operation.
+ */
+ IDEMPOTENT_WRITE((byte) 12);
private final static HashMap<Byte, AclOperation> CODE_TO_VALUE = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/kafka/blob/38f6cae9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 509993f..6917a1d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -416,7 +416,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
}
// we might lose the assignment while fetching the offset, so check it is still active
if (subscriptions.isAssigned(partition)) {
- log.debug("Resetting offset for partition {} to {} offset.", partition, offsetData.offset);
+ log.debug("Resetting offset for partition {} to offset {}.", partition, offsetData.offset);
this.subscriptions.seek(partition, offsetData.offset);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/38f6cae9/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index ac0169a..c11ecc7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -51,7 +51,6 @@ import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.Selector;
-import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.AbstractRecords;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.RecordBatch;
@@ -607,7 +606,9 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
* Implementation of asynchronously send a record to a topic.
*/
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
- ensureProperTransactionalState();
+ if (transactionManager != null)
+ ensureProperTransactionalState();
+
TopicPartition tp = null;
try {
// first make sure the metadata for the topic is available
@@ -642,9 +643,9 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
// producer callback will make sure to call both 'callback' and interceptor callback
- Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp, transactionManager);
+ Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);
- if (transactionManager != null)
+ if (transactionManager != null && transactionManager.isTransactional())
transactionManager.maybeAddPartitionToTransaction(tp);
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
@@ -690,27 +691,17 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
}
private void ensureProperTransactionalState() {
- if (transactionManager == null)
- return;
-
if (transactionManager.isTransactional() && !transactionManager.hasProducerId())
- throw new IllegalStateException("Cannot perform a 'send' before completing a call to initTransactions when transactions are enabled.");
-
- if (transactionManager.isFenced())
- throw Errors.INVALID_PRODUCER_EPOCH.exception();
+ throw new IllegalStateException("Cannot perform a 'send' before completing a call to initTransactions " +
+ "when transactions are enabled.");
if (transactionManager.isInErrorState()) {
- String errorMessage =
- "Cannot perform send because at least one previous transactional or idempotent request has failed with errors.";
Exception lastError = transactionManager.lastError();
- if (lastError != null)
- throw new KafkaException(errorMessage, lastError);
- else
- throw new KafkaException(errorMessage);
+ throw new KafkaException("Cannot perform send because at least one previous transactional or " +
+ "idempotent request has failed with errors.", lastError);
}
if (transactionManager.isCompletingTransaction())
throw new IllegalStateException("Cannot call send while a commit or abort is in progress.");
-
}
private void setReadOnly(Headers headers) {
@@ -1013,14 +1004,11 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
private final Callback userCallback;
private final ProducerInterceptors<K, V> interceptors;
private final TopicPartition tp;
- private final TransactionManager transactionManager;
- public InterceptorCallback(Callback userCallback, ProducerInterceptors<K, V> interceptors,
- TopicPartition tp, TransactionManager transactionManager) {
+ private InterceptorCallback(Callback userCallback, ProducerInterceptors<K, V> interceptors, TopicPartition tp) {
this.userCallback = userCallback;
this.interceptors = interceptors;
this.tp = tp;
- this.transactionManager = transactionManager;
}
public void onCompletion(RecordMetadata metadata, Exception exception) {
@@ -1034,9 +1022,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
}
if (this.userCallback != null)
this.userCallback.onCompletion(metadata, exception);
-
- if (exception != null && transactionManager != null)
- transactionManager.setError(exception);
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/38f6cae9/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 4c3b99d..116a1c5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -24,15 +24,18 @@ import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NetworkClientUtils;
import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.InvalidMetadataException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
@@ -189,28 +192,34 @@ public class Sender implements Runnable {
* @param now The current POSIX time in milliseconds
*/
void run(long now) {
- long pollTimeout = retryBackoffMs;
- if (!maybeSendTransactionalRequest(now)) {
- pollTimeout = sendProducerData(now);
+ if (transactionManager != null) {
+ if (!transactionManager.isTransactional()) {
+ // this is an idempotent producer, so make sure we have a producer id
+ maybeWaitForProducerId();
+ } else if (transactionManager.hasInflightRequest() || maybeSendTransactionalRequest(now)) {
+ // as long as there are outstanding transactional requests, we simply wait for them to return
+ client.poll(retryBackoffMs, now);
+ return;
+ }
+
+ // do not continue sending if the transaction manager is in a failed state or if there
+ // is no producer id (for the idempotent case).
+ if (transactionManager.isInErrorState() || !transactionManager.hasProducerId()) {
+ RuntimeException lastError = transactionManager.lastError();
+ if (lastError != null)
+ maybeAbortBatches(lastError);
+ client.poll(retryBackoffMs, now);
+ return;
+ }
}
+ long pollTimeout = sendProducerData(now);
log.trace("waiting {}ms in poll", pollTimeout);
- this.client.poll(pollTimeout, now);
+ client.poll(pollTimeout, now);
}
-
private long sendProducerData(long now) {
Cluster cluster = metadata.fetch();
- maybeWaitForProducerId();
-
- if (transactionManager != null && transactionManager.isInErrorState()) {
- final KafkaException exception = transactionManager.lastError() instanceof KafkaException
- ? (KafkaException) transactionManager.lastError()
- : new KafkaException(transactionManager.lastError());
- log.error("aborting producer batches because the transaction manager is in an error state.", exception);
- this.accumulator.abortBatches(exception);
- return Long.MAX_VALUE;
- }
// get the list of partitions with data ready to send
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
@@ -286,22 +295,13 @@ public class Sender implements Runnable {
}
private boolean maybeSendTransactionalRequest(long now) {
- if (transactionManager == null || !transactionManager.isTransactional())
- return false;
-
- if (transactionManager.hasInflightRequest()) {
- log.trace("TransactionalId: {} -- There is already an inflight transactional request. Going to wait for the response.",
+ TransactionManager.TxnRequestHandler nextRequestHandler = transactionManager.nextRequestHandler();
+ if (nextRequestHandler == null) {
+ log.trace("TransactionalId: {} -- There are no pending transactional requests to send",
transactionManager.transactionalId());
- return true;
- }
-
- if (!transactionManager.hasPendingTransactionalRequests()) {
- log.trace("TransactionalId: {} -- There are no pending transactional requests to send", transactionManager.transactionalId());
return false;
}
- TransactionManager.TxnRequestHandler nextRequestHandler = transactionManager.nextRequestHandler();
-
if (nextRequestHandler.isEndTxn() && transactionManager.isCompletingTransaction() && accumulator.hasUnflushedBatches()) {
if (!accumulator.flushInProgress())
accumulator.beginFlush();
@@ -311,15 +311,11 @@ public class Sender implements Runnable {
return false;
}
- if (transactionManager.maybeTerminateRequestWithError(nextRequestHandler)) {
- log.trace("TransactionalId: {} -- Not sending a transactional request because we are in an error state",
- transactionManager.transactionalId());
- return false;
- }
-
- Node targetNode = null;
+ log.debug("TransactionalId: {} -- Sending transactional request {}", transactionManager.transactionalId(),
+ nextRequestHandler.requestBuilder());
- while (targetNode == null) {
+ while (true) {
+ Node targetNode = null;
try {
if (nextRequestHandler.needsCoordinator()) {
targetNode = transactionManager.coordinator(nextRequestHandler.coordinatorType());
@@ -340,8 +336,8 @@ public class Sender implements Runnable {
transactionManager.transactionalId(), retryBackoffMs, nextRequestHandler.requestBuilder());
time.sleep(retryBackoffMs);
}
- ClientRequest clientRequest = client.newClientRequest(targetNode.idString(), nextRequestHandler.requestBuilder(),
- now, true, nextRequestHandler);
+ ClientRequest clientRequest = client.newClientRequest(targetNode.idString(),
+ nextRequestHandler.requestBuilder(), now, true, nextRequestHandler);
transactionManager.setInFlightRequestCorrelationId(clientRequest.correlationId());
log.trace("TransactionalId: {} -- Sending transactional request {} to node {}", transactionManager.transactionalId(),
nextRequestHandler.requestBuilder(), clientRequest.destination());
@@ -349,9 +345,9 @@ public class Sender implements Runnable {
return true;
}
} catch (IOException e) {
- targetNode = null;
- log.warn("TransactionalId: " + transactionManager.transactionalId() + " -- Got an exception when trying " +
- "to find a node to send transactional request " + nextRequestHandler.requestBuilder() + ". Going to back off and retry", e);
+ log.debug("TransactionalId: {} -- Disconnect from {} while trying to send transactional " +
+ "request {}. Going to back off and retry", transactionManager.transactionalId(),
+ targetNode, nextRequestHandler.requestBuilder());
}
log.trace("TransactionalId: {}. About to wait for {}ms before trying to send another transactional request.",
transactionManager.transactionalId(), retryBackoffMs);
@@ -364,6 +360,13 @@ public class Sender implements Runnable {
return true;
}
+ private void maybeAbortBatches(RuntimeException exception) {
+ if (accumulator.hasUnflushedBatches()) {
+ log.error("Aborting producer batches due to fatal error", exception);
+ accumulator.abortBatches(exception);
+ }
+ }
+
/**
* Start closing the sender (won't actually complete until all data is sent out)
*/
@@ -383,7 +386,7 @@ public class Sender implements Runnable {
initiateClose();
}
- private ClientResponse sendAndAwaitInitPidRequest(Node node) throws IOException {
+ private ClientResponse sendAndAwaitInitProducerIdRequest(Node node) throws IOException {
String nodeId = node.idString();
InitProducerIdRequest.Builder builder = new InitProducerIdRequest.Builder(null);
ClientRequest request = client.newClientRequest(nodeId, builder, time.milliseconds(), true, null);
@@ -399,43 +402,37 @@ public class Sender implements Runnable {
}
private void maybeWaitForProducerId() {
- // If this is a transactional producer, the producer id will be received when recovering transactions in the
- // initTransactions() method of the producer.
- if (transactionManager == null || transactionManager.isTransactional())
- return;
-
while (!transactionManager.hasProducerId() && !transactionManager.isInErrorState()) {
try {
Node node = awaitLeastLoadedNodeReady(requestTimeout);
if (node != null) {
- ClientResponse response = sendAndAwaitInitPidRequest(node);
-
- if (response.hasResponse() && (response.responseBody() instanceof InitProducerIdResponse)) {
- InitProducerIdResponse initProducerIdResponse = (InitProducerIdResponse) response.responseBody();
- Exception exception = initProducerIdResponse.error().exception();
- if (exception != null && !(exception instanceof RetriableException)) {
- transactionManager.setError(exception);
- return;
- }
+ ClientResponse response = sendAndAwaitInitProducerIdRequest(node);
+ InitProducerIdResponse initProducerIdResponse = (InitProducerIdResponse) response.responseBody();
+ Errors error = initProducerIdResponse.error();
+ if (error == Errors.NONE) {
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(
initProducerIdResponse.producerId(), initProducerIdResponse.epoch());
transactionManager.setProducerIdAndEpoch(producerIdAndEpoch);
+ } else if (error.exception() instanceof RetriableException) {
+ log.debug("Retriable error from InitProducerId response", error.message());
} else {
- log.error("Received an unexpected response type for an InitProducerIdRequest from {}. " +
- "We will back off and try again.", node);
+ transactionManager.transitionToFatalError(error.exception());
+ break;
}
} else {
log.debug("Could not find an available broker to send InitProducerIdRequest to. " +
"We will back off and try again.");
}
- } catch (Exception e) {
- log.warn("Received an exception while trying to get a producer id. Will back off and retry.", e);
+ } catch (UnsupportedVersionException e) {
+ transactionManager.transitionToFatalError(e);
+ break;
+ } catch (IOException e) {
+ log.debug("Broker {} disconnected while awaiting InitProducerId response", e);
}
log.trace("Retry InitProducerIdRequest in {}ms.", retryBackoffMs);
time.sleep(retryBackoffMs);
metadata.requestUpdate();
}
-
}
/**
@@ -507,9 +504,9 @@ public class Sender implements Runnable {
error);
if (transactionManager == null) {
reenqueueBatch(batch, now);
- } else if (transactionManager.producerIdAndEpoch().producerId == batch.producerId() &&
- transactionManager.producerIdAndEpoch().epoch == batch.producerEpoch()) {
- // If idempotence is enabled only retry the request if the current producer id is the same as the producer id of the batch.
+ } else if (transactionManager.hasProducerIdAndEpoch(batch.producerId(), batch.producerEpoch())) {
+ // If idempotence is enabled only retry the request if the current producer id is the same as
+ // the producer id of the batch.
log.debug("Retrying batch to topic-partition {}. Sequence number : {}", batch.topicPartition,
transactionManager.sequenceNumber(batch.topicPartition));
reenqueueBatch(batch, now);
@@ -523,12 +520,10 @@ public class Sender implements Runnable {
final RuntimeException exception;
if (error == Errors.TOPIC_AUTHORIZATION_FAILED)
exception = new TopicAuthorizationException(batch.topicPartition.topic());
+ else if (error == Errors.CLUSTER_AUTHORIZATION_FAILED)
+ exception = new ClusterAuthorizationException("The producer is not authorized to do idempotent sends");
else
exception = error.exception();
- if (error == Errors.OUT_OF_ORDER_SEQUENCE_NUMBER && batch.producerId() == transactionManager.producerIdAndEpoch().producerId)
- log.error("The broker received an out of order sequence number for correlation id {}, topic-partition " +
- "{} at offset {}. This indicates data loss on the broker, and should be investigated.",
- correlationId, batch.topicPartition, response.baseOffset);
// tell the user the result of their request
failBatch(batch, response, exception);
this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount);
@@ -543,12 +538,6 @@ public class Sender implements Runnable {
} else {
completeBatch(batch, response);
- if (transactionManager != null && transactionManager.producerIdAndEpoch().producerId == batch.producerId()
- && transactionManager.producerIdAndEpoch().epoch == batch.producerEpoch()) {
- transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount);
- log.debug("Incremented sequence number for topic-partition {} to {}", batch.topicPartition,
- transactionManager.sequenceNumber(batch.topicPartition));
- }
}
// Unmute the completed partition.
@@ -562,18 +551,38 @@ public class Sender implements Runnable {
}
private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response) {
+ if (transactionManager != null && transactionManager.hasProducerIdAndEpoch(batch.producerId(), batch.producerEpoch())) {
+ transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount);
+ log.debug("Incremented sequence number for topic-partition {} to {}", batch.topicPartition,
+ transactionManager.sequenceNumber(batch.topicPartition));
+ }
+
batch.done(response.baseOffset, response.logAppendTime, null);
this.accumulator.deallocate(batch);
}
private void failBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, RuntimeException exception) {
- if (transactionManager != null && !transactionManager.isTransactional()
- && batch.producerId() == transactionManager.producerIdAndEpoch().producerId) {
- // Reset the transaction state since we have hit an irrecoverable exception and cannot make any guarantees
- // about the previously committed message. Note that this will discard the producer id and sequence
- // numbers for all existing partitions.
- transactionManager.resetProducerId();
+ if (transactionManager != null) {
+ if (exception instanceof OutOfOrderSequenceException
+ && !transactionManager.isTransactional()
+ && transactionManager.hasProducerId(batch.producerId())) {
+ log.error("The broker received an out of order sequence number for topic-partition " +
+ "{} at offset {}. This indicates data loss on the broker, and should be investigated.",
+ batch.topicPartition, response.baseOffset);
+
+ // Reset the transaction state since we have hit an irrecoverable exception and cannot make any guarantees
+ // about the previously committed message. Note that this will discard the producer id and sequence
+ // numbers for all existing partitions.
+ transactionManager.resetProducerId();
+ } else if (exception instanceof ClusterAuthorizationException
+ || exception instanceof TransactionalIdAuthorizationException
+ || exception instanceof ProducerFencedException) {
+ transactionManager.transitionToFatalError(exception);
+ } else if (transactionManager.isTransactional()) {
+ transactionManager.transitionToAbortableError(exception);
+ }
}
+
batch.done(response.baseOffset, response.logAppendTime, exception);
this.accumulator.deallocate(batch);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/38f6cae9/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index d84a88e..d674697 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -23,7 +23,7 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.GroupAuthorizationException;
-import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
@@ -77,7 +77,7 @@ public class TransactionManager {
private Node consumerGroupCoordinator;
private volatile State currentState = State.UNINITIALIZED;
- private volatile Exception lastError = null;
+ private volatile RuntimeException lastError = null;
private volatile ProducerIdAndEpoch producerIdAndEpoch;
private enum State {
@@ -87,32 +87,34 @@ public class TransactionManager {
IN_TRANSACTION,
COMMITTING_TRANSACTION,
ABORTING_TRANSACTION,
- FENCED,
- ERROR;
+ ABORTABLE_ERROR,
+ FATAL_ERROR;
private boolean isTransitionValid(State source, State target) {
switch (target) {
case INITIALIZING:
- return source == UNINITIALIZED || source == ERROR;
+ return source == UNINITIALIZED;
case READY:
return source == INITIALIZING || source == COMMITTING_TRANSACTION
- || source == ABORTING_TRANSACTION || source == ERROR;
+ || source == ABORTING_TRANSACTION || source == ABORTABLE_ERROR;
case IN_TRANSACTION:
return source == READY;
case COMMITTING_TRANSACTION:
return source == IN_TRANSACTION;
case ABORTING_TRANSACTION:
- return source == IN_TRANSACTION || source == ERROR;
+ return source == IN_TRANSACTION || source == ABORTABLE_ERROR;
+ case ABORTABLE_ERROR:
+ return source == IN_TRANSACTION || source == COMMITTING_TRANSACTION || source == ABORTING_TRANSACTION;
+ case FATAL_ERROR:
default:
- // We can transition to FENCED or ERROR unconditionally.
- // FENCED is never a valid starting state for any transition. So the only option is to close the
+ // We can transition to FATAL_ERROR unconditionally.
+ // FATAL_ERROR is never a valid starting state for any transition. So the only option is to close the
// producer or do purely non transactional requests.
return true;
}
}
}
-
// We use the priority to determine the order in which requests need to be sent out. For instance, if we have
// a pending FindCoordinator request, that must always go first. Next, If we need a producer id, that must go second.
// The endTxn request must always go last.
@@ -149,7 +151,7 @@ public class TransactionManager {
}
TransactionManager() {
- this("", 0);
+ this(null, 0);
}
public synchronized TransactionalRequestResult initializeTransactions() {
@@ -178,8 +180,8 @@ public class TransactionManager {
public synchronized TransactionalRequestResult beginAbortingTransaction() {
ensureTransactional();
- if (isFenced())
- throw Errors.INVALID_PRODUCER_EPOCH.exception();
+ if (currentState != State.ABORTABLE_ERROR)
+ maybeFailWithError();
transitionTo(State.ABORTING_TRANSACTION);
return beginCompletingTransaction(false);
}
@@ -213,12 +215,16 @@ public class TransactionManager {
}
public synchronized void maybeAddPartitionToTransaction(TopicPartition topicPartition) {
- if (!isInTransaction() || partitionsInTransaction.contains(topicPartition))
+ if (!isInTransaction())
+ throw new IllegalArgumentException("Cannot add partitions to a transaction in state " + currentState);
+
+ if (partitionsInTransaction.contains(topicPartition))
return;
+
newPartitionsToBeAddedToTransaction.add(topicPartition);
}
- public Exception lastError() {
+ public RuntimeException lastError() {
return lastError;
}
@@ -231,11 +237,7 @@ public class TransactionManager {
}
public boolean isTransactional() {
- return transactionalId != null && !transactionalId.isEmpty();
- }
-
- public boolean isFenced() {
- return currentState == State.FENCED;
+ return transactionalId != null;
}
public boolean isCompletingTransaction() {
@@ -247,31 +249,15 @@ public class TransactionManager {
}
public boolean isInErrorState() {
- return currentState == State.ERROR || currentState == State.FENCED;
- }
-
- public synchronized void setError(Exception exception) {
- if (exception instanceof ProducerFencedException)
- transitionTo(State.FENCED, exception);
- else
- transitionTo(State.ERROR, exception);
- }
-
- boolean maybeTerminateRequestWithError(TxnRequestHandler requestHandler) {
- if (isInErrorState() && requestHandler.isEndTxn()) {
- // We shouldn't terminate abort requests from error states.
- EndTxnHandler endTxnHandler = (EndTxnHandler) requestHandler;
- if (endTxnHandler.builder.result() == TransactionResult.ABORT)
- return false;
- String errorMessage = "Cannot commit transaction because at least one previous transactional request " +
- "was not completed successfully.";
- if (lastError != null)
- requestHandler.fatal(new KafkaException(errorMessage, lastError));
- else
- requestHandler.fatal(new KafkaException(errorMessage));
- return true;
- }
- return false;
+ return currentState == State.ABORTABLE_ERROR || currentState == State.FATAL_ERROR;
+ }
+
+ public synchronized void transitionToAbortableError(RuntimeException exception) {
+ transitionTo(State.ABORTABLE_ERROR, exception);
+ }
+
+ public synchronized void transitionToFatalError(RuntimeException exception) {
+ transitionTo(State.FATAL_ERROR, exception);
}
/**
@@ -284,6 +270,15 @@ public class TransactionManager {
return producerIdAndEpoch;
}
+ boolean hasProducerId(long producerId) {
+ return producerIdAndEpoch.producerId == producerId;
+ }
+
+ boolean hasProducerIdAndEpoch(long producerId, short producerEpoch) {
+ ProducerIdAndEpoch idAndEpoch = this.producerIdAndEpoch;
+ return idAndEpoch.producerId == producerId && idAndEpoch.epoch == producerEpoch;
+ }
+
/**
* Set the producer id and epoch atomically.
*/
@@ -337,26 +332,26 @@ public class TransactionManager {
sequenceNumbers.put(topicPartition, currentSequenceNumber);
}
- boolean hasPendingTransactionalRequests() {
- return !(pendingRequests.isEmpty() && newPartitionsToBeAddedToTransaction.isEmpty());
- }
-
- TxnRequestHandler nextRequestHandler() {
- if (!hasPendingTransactionalRequests())
- return null;
-
+ synchronized TxnRequestHandler nextRequestHandler() {
if (!newPartitionsToBeAddedToTransaction.isEmpty())
pendingRequests.add(addPartitionsToTransactionHandler());
- return pendingRequests.poll();
+ TxnRequestHandler nextRequestHandler = pendingRequests.poll();
+ if (nextRequestHandler != null && maybeTerminateRequestWithError(nextRequestHandler)) {
+ log.trace("TransactionalId: {} -- Not sending transactional request {} because we are in an error state",
+ transactionalId, nextRequestHandler.requestBuilder());
+ return null;
+ }
+
+ return nextRequestHandler;
}
- void retry(TxnRequestHandler request) {
+ synchronized void retry(TxnRequestHandler request) {
request.setRetry();
pendingRequests.add(request);
}
- void reenqueue(TxnRequestHandler request) {
+ synchronized void reenqueue(TxnRequestHandler request) {
pendingRequests.add(request);
}
@@ -406,15 +401,21 @@ public class TransactionManager {
transitionTo(target, null);
}
- private synchronized void transitionTo(State target, Exception error) {
- if (currentState.isTransitionValid(currentState, target)) {
- currentState = target;
- if (target == State.ERROR && error != null)
- lastError = error;
- } else {
+ private synchronized void transitionTo(State target, RuntimeException error) {
+ if (!currentState.isTransitionValid(currentState, target))
throw new KafkaException("Invalid transition attempted from state " + currentState.name() +
" to state " + target.name());
+
+ if (target == State.FATAL_ERROR || target == State.ABORTABLE_ERROR) {
+ if (error == null)
+ throw new IllegalArgumentException("Cannot transition to " + target + " with an null exception");
+ lastError = error;
+ } else {
+ lastError = null;
}
+
+ log.debug("TransactionalId {} -- Transition from state {} to {}", transactionalId, currentState, target);
+ currentState = target;
}
private void ensureTransactional() {
@@ -423,15 +424,23 @@ public class TransactionManager {
}
private void maybeFailWithError() {
- if (isFenced())
- throw Errors.INVALID_PRODUCER_EPOCH.exception();
+ if (isInErrorState())
+ throw new KafkaException("Cannot execute transactional method because we are in an error state", lastError);
+ }
+
+ private boolean maybeTerminateRequestWithError(TxnRequestHandler requestHandler) {
if (isInErrorState()) {
- String errorMessage = "Cannot execute transactional method because we are in an error state.";
- if (lastError != null)
- throw new KafkaException(errorMessage, lastError);
- else
- throw new KafkaException(errorMessage);
+ if (requestHandler instanceof EndTxnHandler) {
+ // we allow abort requests to break out of the error state. The state and the last error
+ // will be cleared when the request returns
+ EndTxnHandler endTxnHandler = (EndTxnHandler) requestHandler;
+ if (endTxnHandler.builder.result() == TransactionResult.ABORT)
+ return false;
+ }
+ requestHandler.fail(lastError);
+ return true;
}
+ return false;
}
private void lookupCoordinator(FindCoordinatorRequest.CoordinatorType type, String coordinatorKey) {
@@ -443,12 +452,11 @@ public class TransactionManager {
transactionCoordinator = null;
break;
default:
- throw new IllegalStateException("Got an invalid coordinator type: " + type);
+ throw new IllegalStateException("Invalid coordinator type: " + type);
}
FindCoordinatorRequest.Builder builder = new FindCoordinatorRequest.Builder(type, coordinatorKey);
- FindCoordinatorHandler request = new FindCoordinatorHandler(builder);
- pendingRequests.add(request);
+ pendingRequests.add(new FindCoordinatorHandler(builder));
}
private void completeTransaction() {
@@ -473,9 +481,8 @@ public class TransactionManager {
CommittedOffset committedOffset = new CommittedOffset(offsetAndMetadata.offset(), offsetAndMetadata.metadata());
pendingTxnOffsetCommits.put(entry.getKey(), committedOffset);
}
- TxnOffsetCommitRequest.Builder builder = new TxnOffsetCommitRequest.Builder(consumerGroupId,
- producerIdAndEpoch.producerId, producerIdAndEpoch.epoch,
- pendingTxnOffsetCommits);
+ TxnOffsetCommitRequest.Builder builder = new TxnOffsetCommitRequest.Builder(transactionalId, consumerGroupId,
+ producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, pendingTxnOffsetCommits);
return new TxnOffsetCommitHandler(result, builder);
}
@@ -491,19 +498,20 @@ public class TransactionManager {
this(new TransactionalRequestResult());
}
- void fatal(RuntimeException e) {
+ void fatalError(RuntimeException e) {
+ result.setError(e);
+ transitionToFatalError(e);
+ result.done();
+ }
+
+ void abortableError(RuntimeException e) {
result.setError(e);
- transitionTo(State.ERROR, e);
+ transitionToAbortableError(e);
result.done();
}
- void fenced() {
- log.error("Producer has become invalid, which typically means another producer with the same " +
- "transactional.id has been started: producerId: {}. epoch: {}.",
- producerIdAndEpoch.producerId, producerIdAndEpoch.epoch);
- result.setError(Errors.INVALID_PRODUCER_EPOCH.exception());
- lastError = Errors.INVALID_PRODUCER_EPOCH.exception();
- transitionTo(State.FENCED, Errors.INVALID_PRODUCER_EPOCH.exception());
+ void fail(RuntimeException e) {
+ result.setError(e);
result.done();
}
@@ -516,19 +524,19 @@ public class TransactionManager {
@SuppressWarnings("unchecked")
public void onComplete(ClientResponse response) {
if (response.requestHeader().correlationId() != inFlightRequestCorrelationId) {
- fatal(new RuntimeException("Detected more than one in-flight transactional request."));
+ fatalError(new RuntimeException("Detected more than one in-flight transactional request."));
} else {
clearInFlightRequestCorrelationId();
if (response.wasDisconnected()) {
log.trace("disconnected from " + response.destination() + ". Will retry.");
reenqueue();
} else if (response.versionMismatch() != null) {
- fatal(response.versionMismatch());
+ fatalError(response.versionMismatch());
} else if (response.hasResponse()) {
log.trace("Got transactional response for request:" + requestBuilder());
handleResponse(response.responseBody());
} else {
- fatal(new KafkaException("Could not execute transactional request for unknown reasons"));
+ fatalError(new KafkaException("Could not execute transactional request for unknown reasons"));
}
}
}
@@ -585,6 +593,10 @@ public class TransactionManager {
public void handleResponse(AbstractResponse response) {
InitProducerIdResponse initProducerIdResponse = (InitProducerIdResponse) response;
Errors error = initProducerIdResponse.error();
+
+ log.debug("TransactionalId {} -- Received InitProducerId response with error {}",
+ transactionalId, error);
+
if (error == Errors.NONE) {
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(initProducerIdResponse.producerId(), initProducerIdResponse.epoch());
setProducerIdAndEpoch(producerIdAndEpoch);
@@ -597,9 +609,9 @@ public class TransactionManager {
} else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) {
reenqueue();
} else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) {
- fatal(error.exception());
+ fatalError(error.exception());
} else {
- fatal(new KafkaException("Unexpected error in InitProducerIdResponse; " + error.message()));
+ fatalError(new KafkaException("Unexpected error in InitProducerIdResponse; " + error.message()));
}
}
}
@@ -626,6 +638,11 @@ public class TransactionManager {
AddPartitionsToTxnResponse addPartitionsToTxnResponse = (AddPartitionsToTxnResponse) response;
Map<TopicPartition, Errors> errors = addPartitionsToTxnResponse.errors();
boolean hasPartitionErrors = false;
+ Set<String> unauthorizedTopics = new HashSet<>();
+
+ log.debug("TransactionalId {} -- Received AddPartitionsToTxn response with errors {}",
+ transactionalId, errors);
+
for (TopicPartition topicPartition : pendingPartitionsToBeAddedToTransaction) {
final Errors error = errors.get(topicPartition);
if (error == Errors.NONE || error == null) {
@@ -640,23 +657,28 @@ public class TransactionManager {
reenqueue();
return;
} else if (error == Errors.INVALID_PRODUCER_EPOCH) {
- fenced();
+ fatalError(error.exception());
return;
} else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) {
- fatal(error.exception());
+ fatalError(error.exception());
return;
} else if (error == Errors.INVALID_PRODUCER_ID_MAPPING
|| error == Errors.INVALID_TXN_STATE) {
- fatal(new KafkaException(error.exception()));
+ fatalError(new KafkaException(error.exception()));
return;
+ } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
+ unauthorizedTopics.add(topicPartition.topic());
} else {
- log.error("Could not add partitions to transaction due to partition error. partition={}, error={}", topicPartition, error);
+ log.error("TransactionalId: {} -- Could not add partition {} due to unexpected error {}",
+ transactionalId, topicPartition, error);
hasPartitionErrors = true;
}
}
- if (hasPartitionErrors) {
- fatal(new KafkaException("Could not add partitions to transaction due to partition level errors"));
+ if (!unauthorizedTopics.isEmpty()) {
+ abortableError(new TopicAuthorizationException(unauthorizedTopics));
+ } else if (hasPartitionErrors) {
+ abortableError(new KafkaException("Could not add partitions to transaction due to partition level errors"));
} else {
partitionsInTransaction.addAll(pendingPartitionsToBeAddedToTransaction);
pendingPartitionsToBeAddedToTransaction.clear();
@@ -695,7 +717,12 @@ public class TransactionManager {
@Override
public void handleResponse(AbstractResponse response) {
FindCoordinatorResponse findCoordinatorResponse = (FindCoordinatorResponse) response;
- if (findCoordinatorResponse.error() == Errors.NONE) {
+ Errors error = findCoordinatorResponse.error();
+
+ log.debug("TransactionalId {} -- Received FindCoordinator response with error {}",
+ transactionalId, error);
+
+ if (error == Errors.NONE) {
Node node = findCoordinatorResponse.node();
switch (builder.coordinatorType()) {
case GROUP:
@@ -705,12 +732,14 @@ public class TransactionManager {
transactionCoordinator = node;
}
result.done();
- } else if (findCoordinatorResponse.error() == Errors.COORDINATOR_NOT_AVAILABLE) {
+ } else if (error == Errors.COORDINATOR_NOT_AVAILABLE) {
reenqueue();
+ } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) {
+ fatalError(error.exception());
} else if (findCoordinatorResponse.error() == Errors.GROUP_AUTHORIZATION_FAILED) {
- fatal(new GroupAuthorizationException("Not authorized to commit offsets " + builder.coordinatorKey()));
+ abortableError(new GroupAuthorizationException(builder.coordinatorKey()));
} else {
- fatal(new KafkaException(String.format("Could not find a coordinator with type %s with key %s due to" +
+ fatalError(new KafkaException(String.format("Could not find a coordinator with type %s with key %s due to" +
"unexpected error: %s", builder.coordinatorType(), builder.coordinatorKey(),
findCoordinatorResponse.error().message())));
}
@@ -743,6 +772,10 @@ public class TransactionManager {
public void handleResponse(AbstractResponse response) {
EndTxnResponse endTxnResponse = (EndTxnResponse) response;
Errors error = endTxnResponse.error();
+
+ log.debug("TransactionalId {} -- Received EndTxn response with error {}",
+ transactionalId, error);
+
if (error == Errors.NONE) {
completeTransaction();
result.done();
@@ -752,11 +785,13 @@ public class TransactionManager {
} else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) {
reenqueue();
} else if (error == Errors.INVALID_PRODUCER_EPOCH) {
- fenced();
+ fatalError(error.exception());
} else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) {
- fatal(error.exception());
+ fatalError(error.exception());
+ } else if (error == Errors.INVALID_TXN_STATE) {
+ fatalError(error.exception());
} else {
- fatal(new KafkaException("Unhandled error in EndTxnResponse: " + error.message()));
+ fatalError(new KafkaException("Unhandled error in EndTxnResponse: " + error.message()));
}
}
}
@@ -785,6 +820,10 @@ public class TransactionManager {
public void handleResponse(AbstractResponse response) {
AddOffsetsToTxnResponse addOffsetsToTxnResponse = (AddOffsetsToTxnResponse) response;
Errors error = addOffsetsToTxnResponse.error();
+
+ log.debug("TransactionalId {} -- Received AddOffsetsToTxn response with error {}",
+ transactionalId, error);
+
if (error == Errors.NONE) {
// note the result is not completed until the TxnOffsetCommit returns
pendingRequests.add(txnOffsetCommitHandler(result, offsets, builder.consumerGroupId()));
@@ -794,11 +833,13 @@ public class TransactionManager {
} else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) {
reenqueue();
} else if (error == Errors.INVALID_PRODUCER_EPOCH) {
- fenced();
+ fatalError(error.exception());
} else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) {
- fatal(error.exception());
+ fatalError(error.exception());
+ } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
+ abortableError(new GroupAuthorizationException(builder.consumerGroupId()));
} else {
- fatal(new KafkaException("Unexpected error in AddOffsetsToTxnResponse: " + error.message()));
+ fatalError(new KafkaException("Unexpected error in AddOffsetsToTxnResponse: " + error.message()));
}
}
}
@@ -837,7 +878,12 @@ public class TransactionManager {
TxnOffsetCommitResponse txnOffsetCommitResponse = (TxnOffsetCommitResponse) response;
boolean coordinatorReloaded = false;
boolean hadFailure = false;
- for (Map.Entry<TopicPartition, Errors> entry : txnOffsetCommitResponse.errors().entrySet()) {
+ Map<TopicPartition, Errors> errors = txnOffsetCommitResponse.errors();
+
+ log.debug("TransactionalId {} -- Received TxnOffsetCommit response with errors {}",
+ transactionalId, errors);
+
+ for (Map.Entry<TopicPartition, Errors> entry : errors.entrySet()) {
TopicPartition topicPartition = entry.getKey();
Errors error = entry.getValue();
if (error == Errors.NONE) {
@@ -850,11 +896,17 @@ public class TransactionManager {
}
} else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
hadFailure = true;
+ } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
+ abortableError(new GroupAuthorizationException(builder.consumerGroupId()));
+ return;
+ } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) {
+ fatalError(error.exception());
+ return;
} else if (error == Errors.INVALID_PRODUCER_EPOCH) {
- fenced();
+ fatalError(error.exception());
return;
} else {
- fatal(new KafkaException("Unexpected error in TxnOffsetCommitResponse: " + error.message()));
+ fatalError(new KafkaException("Unexpected error in TxnOffsetCommitResponse: " + error.message()));
return;
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/38f6cae9/clients/src/main/java/org/apache/kafka/common/errors/ProducerIdAuthorizationException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/ProducerIdAuthorizationException.java b/clients/src/main/java/org/apache/kafka/common/errors/ProducerIdAuthorizationException.java
deleted file mode 100644
index 2da9158..0000000
--- a/clients/src/main/java/org/apache/kafka/common/errors/ProducerIdAuthorizationException.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.errors;
-
-public class ProducerIdAuthorizationException extends ApiException {
- public ProducerIdAuthorizationException(final String message) {
- super(message);
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/38f6cae9/clients/src/main/java/org/apache/kafka/common/errors/TransactionalIdAuthorizationException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/TransactionalIdAuthorizationException.java b/clients/src/main/java/org/apache/kafka/common/errors/TransactionalIdAuthorizationException.java
index 9bf1fbb..3f85513 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/TransactionalIdAuthorizationException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/TransactionalIdAuthorizationException.java
@@ -16,7 +16,7 @@
*/
package org.apache.kafka.common.errors;
-public class TransactionalIdAuthorizationException extends ApiException {
+public class TransactionalIdAuthorizationException extends AuthorizationException {
public TransactionalIdAuthorizationException(final String message) {
super(message);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/38f6cae9/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index f94fb4d..9444eb5 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -57,7 +57,6 @@ import org.apache.kafka.common.errors.OffsetOutOfRangeException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.PolicyViolationException;
import org.apache.kafka.common.errors.ProducerFencedException;
-import org.apache.kafka.common.errors.ProducerIdAuthorizationException;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.errors.RecordBatchTooLargeException;
import org.apache.kafka.common.errors.RecordTooLargeException;
@@ -483,21 +482,13 @@ public enum Errors {
return new TransactionalIdAuthorizationException(message);
}
}),
- PRODUCER_ID_AUTHORIZATION_FAILED(54, "Producer is not authorized to use producer Ids, " +
- "which is required to write idempotent data.",
- new ApiExceptionBuilder() {
- @Override
- public ApiException build(String message) {
- return new ProducerIdAuthorizationException(message);
- }
- }),
- SECURITY_DISABLED(55, "Security features are disabled.", new ApiExceptionBuilder() {
+ SECURITY_DISABLED(54, "Security features are disabled.", new ApiExceptionBuilder() {
@Override
public ApiException build(String message) {
return new SecurityDisabledException(message);
}
}),
- BROKER_AUTHORIZATION_FAILED(56, "Broker authorization failed", new ApiExceptionBuilder() {
+ BROKER_AUTHORIZATION_FAILED(55, "Broker authorization failed", new ApiExceptionBuilder() {
@Override
public ApiException build(String message) {
return new BrokerAuthorizationException(message);
http://git-wip-us.apache.org/repos/asf/kafka/blob/38f6cae9/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index d5ce469..91391e9 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -1516,6 +1516,9 @@ public class Protocol {
);
public static final Schema TXN_OFFSET_COMMIT_REQUEST_V0 = new Schema(
+ new Field("transactional_id",
+ STRING,
+ "The transactional id corresponding to the transaction."),
new Field("consumer_group_id",
STRING,
"Id of the associated consumer group to commit offsets for."),
http://git-wip-us.apache.org/repos/asf/kafka/blob/38f6cae9/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
index 4bf8b3e..3339470 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
@@ -50,6 +50,17 @@ public class AddOffsetsToTxnRequest extends AbstractRequest {
public AddOffsetsToTxnRequest build(short version) {
return new AddOffsetsToTxnRequest(version, transactionalId, producerId, producerEpoch, consumerGroupId);
}
+
+ @Override
+ public String toString() {
+ StringBuilder bld = new StringBuilder();
+ bld.append("(transactionalId=").append(transactionalId).
+ append(", producerId=").append(producerId).
+ append(", producerEpoch=").append(producerEpoch).
+ append(", consumerGroupId=").append(consumerGroupId).
+ append(")");
+ return bld.toString();
+ }
}
private final String transactionalId;
http://git-wip-us.apache.org/repos/asf/kafka/blob/38f6cae9/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
index 8b3a589..754f242 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
@@ -30,10 +30,11 @@ public class AddOffsetsToTxnResponse extends AbstractResponse {
// NotCoordinator
// CoordinatorNotAvailable
// CoordinatorLoadInProgress
- // InvalidPidMapping
+ // InvalidProducerIdMapping
+ // InvalidProducerEpoch
// InvalidTxnState
// GroupAuthorizationFailed
- // InvalidProducerEpoch
+ // TransactionalIdAuthorizationFailed
private final Errors error;
private final int throttleTimeMs;
http://git-wip-us.apache.org/repos/asf/kafka/blob/38f6cae9/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
index 148ebec..e24fa5a 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
@@ -54,6 +54,17 @@ public class AddPartitionsToTxnRequest extends AbstractRequest {
public AddPartitionsToTxnRequest build(short version) {
return new AddPartitionsToTxnRequest(version, transactionalId, producerId, producerEpoch, partitions);
}
+
+ @Override
+ public String toString() {
+ StringBuilder bld = new StringBuilder();
+ bld.append("(transactionalId=").append(transactionalId).
+ append(", producerId=").append(producerId).
+ append(", producerEpoch=").append(producerEpoch).
+ append(", partitions=").append(partitions).
+ append(")");
+ return bld.toString();
+ }
}
private final String transactionalId;
http://git-wip-us.apache.org/repos/asf/kafka/blob/38f6cae9/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
index 697142b..39172ee 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
@@ -43,11 +43,12 @@ public class AddPartitionsToTxnResponse extends AbstractResponse {
// CoordinatorNotAvailable
// CoordinatorLoadInProgress
// InvalidTxnState
- // InvalidPidMapping
+ // InvalidProducerIdMapping
// TopicAuthorizationFailed
// InvalidProducerEpoch
// UnknownTopicOrPartition
// TopicAuthorizationFailed
+ // TransactionalIdAuthorizationFailed
private final Map<TopicPartition, Errors> errors;
public AddPartitionsToTxnResponse(int throttleTimeMs, Map<TopicPartition, Errors> errors) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/38f6cae9/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java
index 77ec137..b9f052c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java
@@ -50,6 +50,17 @@ public class EndTxnRequest extends AbstractRequest {
public EndTxnRequest build(short version) {
return new EndTxnRequest(version, transactionalId, producerId, producerEpoch, result);
}
+
+ @Override
+ public String toString() {
+ StringBuilder bld = new StringBuilder();
+ bld.append("(transactionalId=").append(transactionalId).
+ append(", producerId=").append(producerId).
+ append(", producerEpoch=").append(producerEpoch).
+ append(", result=").append(result).
+ append(")");
+ return bld.toString();
+ }
}
private final String transactionalId;
http://git-wip-us.apache.org/repos/asf/kafka/blob/38f6cae9/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java
index 99e4e8c..17cf68d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java
@@ -31,8 +31,9 @@ public class EndTxnResponse extends AbstractResponse {
// CoordinatorNotAvailable
// CoordinatorLoadInProgress
// InvalidTxnState
- // InvalidPidMapping
+ // InvalidProducerIdMapping
// InvalidProducerEpoch
+ // TransactionalIdAuthorizationFailed
private final Errors error;
private final int throttleTimeMs;
http://git-wip-us.apache.org/repos/asf/kafka/blob/38f6cae9/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
index 7c8a6e5..96e1cdf 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
@@ -24,11 +24,13 @@ import org.apache.kafka.common.record.RecordBatch;
import java.nio.ByteBuffer;
public class InitProducerIdResponse extends AbstractResponse {
- /**
- * Possible Error codes:
- * OK
- *
- */
+ // Possible error codes:
+ // NotCoordinator
+ // CoordinatorNotAvailable
+ // CoordinatorLoadInProgress
+ // TransactionalIdAuthorizationFailed
+ // ClusterAuthorizationFailed
+
private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
private static final String PRODUCER_ID_KEY_NAME = "producer_id";
private static final String EPOCH_KEY_NAME = "producer_epoch";
http://git-wip-us.apache.org/repos/asf/kafka/blob/38f6cae9/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
index 3377f91..3d696c1 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
@@ -228,13 +228,14 @@ public class ProduceRequest extends AbstractRequest {
}
@Override
- public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+ public ProduceResponse getErrorResponse(int throttleTimeMs, Throwable e) {
/* In case the producer doesn't actually want any response */
if (acks == 0)
return null;
+ Errors error = Errors.forException(e);
Map<TopicPartition, ProduceResponse.PartitionResponse> responseMap = new HashMap<>();
- ProduceResponse.PartitionResponse partitionResponse = new ProduceResponse.PartitionResponse(Errors.forException(e));
+ ProduceResponse.PartitionResponse partitionResponse = new ProduceResponse.PartitionResponse(error);
for (TopicPartition tp : partitions())
responseMap.put(tp, partitionResponse);
http://git-wip-us.apache.org/repos/asf/kafka/blob/38f6cae9/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
index 42ae434..55332f6 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
@@ -61,6 +61,9 @@ public class ProduceResponse extends AbstractResponse {
* INVALID_REQUIRED_ACKS (21)
* TOPIC_AUTHORIZATION_FAILED (29)
* UNSUPPORTED_FOR_MESSAGE_FORMAT (43)
+ * INVALID_PRODUCER_EPOCH (47)
+ * CLUSTER_AUTHORIZATION_FAILED (31)
+ * TRANSACTIONAL_ID_AUTHORIZATION_FAILED (53)
*/
private static final String BASE_OFFSET_KEY_NAME = "base_offset";
http://git-wip-us.apache.org/repos/asf/kafka/blob/38f6cae9/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
index f5334f2..68fa3d2 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
import java.util.Map;
public class TxnOffsetCommitRequest extends AbstractRequest {
+ private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id";
private static final String CONSUMER_GROUP_ID_KEY_NAME = "consumer_group_id";
private static final String PRODUCER_ID_KEY_NAME = "producer_id";
private static final String PRODUCER_EPOCH_KEY_NAME = "producer_epoch";
@@ -38,14 +39,16 @@ public class TxnOffsetCommitRequest extends AbstractRequest {
private static final String METADATA_KEY_NAME = "metadata";
public static class Builder extends AbstractRequest.Builder<TxnOffsetCommitRequest> {
+ private final String transactionalId;
private final String consumerGroupId;
private final long producerId;
private final short producerEpoch;
private final Map<TopicPartition, CommittedOffset> offsets;
- public Builder(String consumerGroupId, long producerId, short producerEpoch,
+ public Builder(String transactionalId, String consumerGroupId, long producerId, short producerEpoch,
Map<TopicPartition, CommittedOffset> offsets) {
super(ApiKeys.TXN_OFFSET_COMMIT);
+ this.transactionalId = transactionalId;
this.consumerGroupId = consumerGroupId;
this.producerId = producerId;
this.producerEpoch = producerEpoch;
@@ -58,18 +61,32 @@ public class TxnOffsetCommitRequest extends AbstractRequest {
@Override
public TxnOffsetCommitRequest build(short version) {
- return new TxnOffsetCommitRequest(version, consumerGroupId, producerId, producerEpoch, offsets);
+ return new TxnOffsetCommitRequest(version, transactionalId, consumerGroupId, producerId, producerEpoch, offsets);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder bld = new StringBuilder();
+ bld.append("(transactionalId=").append(transactionalId).
+ append(", producerId=").append(producerId).
+ append(", producerEpoch=").append(producerEpoch).
+ append(", consumerGroupId=").append(consumerGroupId).
+ append(", offsets=").append(offsets).
+ append(")");
+ return bld.toString();
}
}
+ private final String transactionalId;
private final String consumerGroupId;
private final long producerId;
private final short producerEpoch;
private final Map<TopicPartition, CommittedOffset> offsets;
- public TxnOffsetCommitRequest(short version, String consumerGroupId, long producerId, short producerEpoch,
- Map<TopicPartition, CommittedOffset> offsets) {
+ public TxnOffsetCommitRequest(short version, String transactionalId, String consumerGroupId, long producerId,
+ short producerEpoch, Map<TopicPartition, CommittedOffset> offsets) {
super(version);
+ this.transactionalId = transactionalId;
this.consumerGroupId = consumerGroupId;
this.producerId = producerId;
this.producerEpoch = producerEpoch;
@@ -78,6 +95,7 @@ public class TxnOffsetCommitRequest extends AbstractRequest {
public TxnOffsetCommitRequest(Struct struct, short version) {
super(version);
+ this.transactionalId = struct.getString(TRANSACTIONAL_ID_KEY_NAME);
this.consumerGroupId = struct.getString(CONSUMER_GROUP_ID_KEY_NAME);
this.producerId = struct.getLong(PRODUCER_ID_KEY_NAME);
this.producerEpoch = struct.getShort(PRODUCER_EPOCH_KEY_NAME);
@@ -98,6 +116,10 @@ public class TxnOffsetCommitRequest extends AbstractRequest {
this.offsets = offsets;
}
+ public String transactionalId() {
+ return transactionalId;
+ }
+
public String consumerGroupId() {
return consumerGroupId;
}
@@ -117,6 +139,7 @@ public class TxnOffsetCommitRequest extends AbstractRequest {
@Override
protected Struct toStruct() {
Struct struct = new Struct(ApiKeys.TXN_OFFSET_COMMIT.requestSchema(version()));
+ struct.set(TRANSACTIONAL_ID_KEY_NAME, transactionalId);
struct.set(CONSUMER_GROUP_ID_KEY_NAME, consumerGroupId);
struct.set(PRODUCER_ID_KEY_NAME, producerId);
struct.set(PRODUCER_EPOCH_KEY_NAME, producerEpoch);
http://git-wip-us.apache.org/repos/asf/kafka/blob/38f6cae9/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
index 37b9a50..a62568f 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
@@ -42,6 +42,7 @@ public class TxnOffsetCommitResponse extends AbstractResponse {
// OffsetMetadataTooLarge
// GroupAuthorizationFailed
// InvalidCommitOffsetSize
+ // TransactionalIdAuthorizationFailed
private final Map<TopicPartition, Errors> errors;
private final int throttleTimeMs;
http://git-wip-us.apache.org/repos/asf/kafka/blob/38f6cae9/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
index 06f6662..ddddc42 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
@@ -48,6 +48,7 @@ public class WriteTxnMarkersResponse extends AbstractResponse {
// InvalidRequiredAcks
// TransactionCoordinatorFenced
// RequestTimeout
+ // ClusterAuthorizationFailed
private final Map<Long, Map<TopicPartition, Errors>> errors;
http://git-wip-us.apache.org/repos/asf/kafka/blob/38f6cae9/clients/src/test/java/org/apache/kafka/clients/admin/AclOperationTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/AclOperationTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/AclOperationTest.java
index 06ace63..0e3441f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/AclOperationTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/AclOperationTest.java
@@ -47,7 +47,8 @@ public class AclOperationTest {
new AclOperationTestInfo(AclOperation.DESCRIBE, 8, "describe", false),
new AclOperationTestInfo(AclOperation.CLUSTER_ACTION, 9, "cluster_action", false),
new AclOperationTestInfo(AclOperation.DESCRIBE_CONFIGS, 10, "describe_configs", false),
- new AclOperationTestInfo(AclOperation.ALTER_CONFIGS, 11, "alter_configs", false)
+ new AclOperationTestInfo(AclOperation.ALTER_CONFIGS, 11, "alter_configs", false),
+ new AclOperationTestInfo(AclOperation.IDEMPOTENT_WRITE, 12, "idempotent_write", false)
};
@Test