You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/22 19:41:45 UTC

[GitHub] [kafka] abbccdda opened a new pull request #8725: KAFKA-9608: Transaction Event Simulation Test

abbccdda opened a new pull request #8725:
URL: https://github.com/apache/kafka/pull/8725


   Start the basic template for transaction event simulation.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #8725: KAFKA-9608: Transaction Event Simulation Test

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #8725:
URL: https://github.com/apache/kafka/pull/8725#discussion_r490627471



##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionEventSimulationTest.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * This test tries to test out the EOS robustness on the client side. It features a {@link TransactionSimulationCoordinator}
+ * which handles the incoming transactional produce/metadata requests and gives basic feedback through {@link MockClient}.
+ *
+ * Each iteration the transaction manager will append one record through accumulator and commit offset at the same time. The request
+ * being transmitted is not guaranteed to be processed or processed correctly, so a state checking loop is enforced to make the client
+ * and the coordinator interact with each other and ensure the state could be eventually clean using {@link TransactionManager#isReady}.
+ * By the end of the test we will check whether all the committed transactions are successfully materialized on the coordinator side.
+ *
+ * Features supported:
+ * 
+ * 1. Randomly abort transaction
+ * 2. Fault injection on response
+ * 3. Random message drop
+ */
+public class TransactionEventSimulationTest {
+
+    private TransactionManager transactionManager;
+    private TransactionSimulationCoordinator transactionCoordinator;
+    private Sender sender;
+    private final LogContext logContext = new LogContext();
+
+    private final MockTime time = new MockTime();
+    private final int requestTimeoutMs = 100;
+    private final int retryBackOffMs = 0;
+    private final long apiVersion = 0L;
+
+    private ProducerMetadata metadata = new ProducerMetadata(0, Long.MAX_VALUE, 10,
+        new LogContext(), new ClusterResourceListeners(), time);
+    private MockClient client = new MockClient(time, metadata);
+
+    @Before
+    public void setup() {
+        transactionManager = new TransactionManager(logContext, "txn-id",
+            requestTimeoutMs, apiVersion, new ApiVersions(), false);
+        transactionCoordinator = new TransactionSimulationCoordinator(client);
+    }
+
+    @Test
+    public void simulateTxnEvents() throws InterruptedException {
+        final int batchSize = 100;
+        final int lingerMs = 0;
+        final int deliveryTimeoutMs = 10;
+
+        RecordAccumulator accumulator = new RecordAccumulator(logContext, batchSize, CompressionType.GZIP,
+            lingerMs, retryBackOffMs, deliveryTimeoutMs, new Metrics(), "accumulator", time, new ApiVersions(), transactionManager,

Review comment:
       nit: make `Metrics` a field

##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionSimulationCoordinator.java
##########
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.AddOffsetsToTxnResponseData;
+import org.apache.kafka.common.message.EndTxnResponseData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.message.InitProducerIdResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.AddOffsetsToTxnRequest;
+import org.apache.kafka.common.requests.AddOffsetsToTxnResponse;
+import org.apache.kafka.common.requests.AddPartitionsToTxnRequest;
+import org.apache.kafka.common.requests.AddPartitionsToTxnResponse;
+import org.apache.kafka.common.requests.EndTxnRequest;
+import org.apache.kafka.common.requests.EndTxnResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.requests.InitProducerIdRequest;
+import org.apache.kafka.common.requests.InitProducerIdResponse;
+import org.apache.kafka.common.requests.ProduceRequest;
+import org.apache.kafka.common.requests.ProduceResponse;
+import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse;
+import org.apache.kafka.common.requests.TransactionResult;
+import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
+import org.apache.kafka.common.requests.TxnOffsetCommitResponse;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Random;
+
+import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_EPOCH;
+import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_ID;
+
+/**
+ * A reduced functionality of a combination of transaction coordinator and group coordinator.
+ * It provides basic event handling from {@link Sender} with transaction turned on.
+ *
+ * Random fault injection is supported as well, which will return a retriable error to
+ * the client.
+ */
+class TransactionSimulationCoordinator {
+
+    private final Map<TopicPartition, List<Record>> pendingPartitionData;
+    private final Map<TopicPartition, Long> pendingOffsets;
+    private boolean offsetsAddedToTxn = false;
+
+    private long currentProducerId = 0L;
+    private short currentEpoch = 0;
+    private Random faultInjectRandom = new Random();
+
+    public Map<TopicPartition, List<Record>> persistentPartitionData() {
+        return persistentPartitionData;
+    }
+
+    public Map<TopicPartition, Long> committedOffsets() {
+        return committedOffsets;
+    }
+
+    private final Map<TopicPartition, List<Record>> persistentPartitionData;
+    private final Map<TopicPartition, Long> committedOffsets;
+
+    private final MockClient networkClient;
+    private final int throttleTimeMs = 10;
+
+    TransactionSimulationCoordinator(MockClient networkClient) {
+        this.networkClient = networkClient;
+        this.pendingPartitionData = new HashMap<>();
+        this.pendingOffsets = new HashMap<>();
+        this.persistentPartitionData = new HashMap<>();
+        this.committedOffsets = new HashMap<>();
+    }
+
+    void runOnce(boolean dropMessage) {
+        Queue<ClientRequest> incomingRequests = networkClient.requests();
+
+        final boolean faultInject = faultInjectRandom.nextBoolean();
+
+        if (incomingRequests.peek() == null) {
+            return;
+        }
+
+        final AbstractResponse response;
+        AbstractRequest nextRequest = incomingRequests.peek().requestBuilder().build();
+        if (nextRequest instanceof FindCoordinatorRequest) {
+            response = handleFindCoordinator(faultInject);
+        } else if (nextRequest instanceof InitProducerIdRequest) {
+            response = handleInitProducerId((InitProducerIdRequest) nextRequest, faultInject);
+        } else if (nextRequest instanceof AddPartitionsToTxnRequest) {
+            response = handleAddPartitionToTxn((AddPartitionsToTxnRequest) nextRequest, faultInject);
+        } else if (nextRequest instanceof AddOffsetsToTxnRequest) {
+            response = handleAddOffsetsToTxn((AddOffsetsToTxnRequest) nextRequest, faultInject);
+        } else if (nextRequest instanceof TxnOffsetCommitRequest) {
+            response = handleTxnCommit((TxnOffsetCommitRequest) nextRequest, faultInject);
+        } else if (nextRequest instanceof ProduceRequest) {
+            response = handleProduce((ProduceRequest) nextRequest, faultInject);
+        } else if (nextRequest instanceof EndTxnRequest) {
+            response = handleEndTxn((EndTxnRequest) nextRequest, faultInject);
+        } else {
+            throw new IllegalArgumentException("Unknown request: " + nextRequest);
+        }
+
+        networkClient.respond(response, dropMessage);
+    }
+
+    private FindCoordinatorResponse handleFindCoordinator(final boolean faultInject) {
+        if (faultInject) {
+            return new FindCoordinatorResponse(
+                new FindCoordinatorResponseData()
+                    .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()));
+        } else {
+            return new FindCoordinatorResponse(
+                new FindCoordinatorResponseData()
+                    .setErrorCode(Errors.NONE.code())
+                    .setHost("localhost")
+                    .setNodeId(0)
+                    .setPort(2211)
+            );
+        }
+    }
+
+    private InitProducerIdResponse handleInitProducerId(InitProducerIdRequest request,
+                                                        final boolean faultInject) {
+        if (faultInject) {
+            return new InitProducerIdResponse(
+                new InitProducerIdResponseData()
+                    .setErrorCode(Errors.NOT_COORDINATOR.code())
+            );
+        } else if (request.data.producerId() != NO_PRODUCER_ID &&
+                    request.data.producerId() != currentProducerId) {
+            return new InitProducerIdResponse(
+                new InitProducerIdResponseData()
+                    .setErrorCode(Errors.UNKNOWN_PRODUCER_ID.code())
+                    .setThrottleTimeMs(throttleTimeMs)
+            );
+        } else if (request.data.producerEpoch() != NO_PRODUCER_EPOCH &&
+                    request.data.producerEpoch() != currentEpoch) {
+            return new InitProducerIdResponse(
+                new InitProducerIdResponseData()
+                    .setErrorCode(Errors.INVALID_PRODUCER_EPOCH.code())
+                    .setThrottleTimeMs(throttleTimeMs)
+            );
+        } else {
+            currentProducerId += 1;
+            currentEpoch += 1;
+            return new InitProducerIdResponse(
+                new InitProducerIdResponseData()
+                    .setProducerId(currentProducerId)
+                    .setProducerEpoch(currentEpoch)
+                    .setErrorCode(Errors.NONE.code())
+            );
+        }
+    }
+
+    private AddPartitionsToTxnResponse handleAddPartitionToTxn(AddPartitionsToTxnRequest request,
+                                                               final boolean faultInject) {
+        Map<TopicPartition, Errors> errors = new HashMap<>();
+        request.partitions().forEach(topicPartition -> {
+            if (faultInject) {
+                errors.put(topicPartition, Errors.COORDINATOR_NOT_AVAILABLE);
+            } else if (request.data.producerId() != currentProducerId) {
+                errors.put(topicPartition, Errors.UNKNOWN_PRODUCER_ID);
+            } else if (request.data.producerEpoch() != currentEpoch) {
+                errors.put(topicPartition, Errors.INVALID_PRODUCER_EPOCH);
+            } else {
+                errors.put(topicPartition, Errors.NONE);
+            }
+        });
+
+        return new AddPartitionsToTxnResponse(
+            throttleTimeMs,
+            errors
+        );
+    }
+
+    private AddOffsetsToTxnResponse handleAddOffsetsToTxn(AddOffsetsToTxnRequest request,
+                                                          final boolean faultInject) {
+        if (faultInject) {
+            return new AddOffsetsToTxnResponse(
+                new AddOffsetsToTxnResponseData()
+                    .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code())
+                    .setThrottleTimeMs(throttleTimeMs)
+            );
+        } else if (request.data.producerId() != currentProducerId) {
+            return new AddOffsetsToTxnResponse(
+                new AddOffsetsToTxnResponseData()
+                    .setErrorCode(Errors.UNKNOWN_PRODUCER_ID.code())
+                    .setThrottleTimeMs(throttleTimeMs)
+            );
+        } else if (request.data.producerEpoch() != currentEpoch) {
+            return new AddOffsetsToTxnResponse(
+                new AddOffsetsToTxnResponseData()
+                    .setErrorCode(Errors.INVALID_PRODUCER_EPOCH.code())
+                    .setThrottleTimeMs(throttleTimeMs)
+            );
+        } else {
+            offsetsAddedToTxn = true;
+            return new AddOffsetsToTxnResponse(
+                new AddOffsetsToTxnResponseData()
+                    .setErrorCode(Errors.NONE.code())
+                    .setThrottleTimeMs(throttleTimeMs)
+            );
+        }
+    }
+
+    private AbstractResponse handleTxnCommit(TxnOffsetCommitRequest request,
+                                             final boolean faultInject) {
+        Map<TopicPartition, Errors> errors = new HashMap<>();
+        request.data.topics().forEach(topic -> topic.partitions().forEach(partition -> {
+            TopicPartition key = new TopicPartition(topic.name(), partition.partitionIndex());
+            if (faultInject) {
+                errors.put(key, Errors.COORDINATOR_LOAD_IN_PROGRESS);
+            } else if (request.data.producerId() != currentProducerId) {
+                errors.put(key, Errors.UNKNOWN_PRODUCER_ID);
+            } else if (request.data.producerEpoch() != currentEpoch) {
+                errors.put(key, Errors.INVALID_PRODUCER_EPOCH);
+            } else if (offsetsAddedToTxn) {
+                pendingOffsets.put(key, partition.committedOffset());
+                errors.put(key, Errors.NONE);
+            } else {
+                errors.put(key, Errors.UNKNOWN_TOPIC_OR_PARTITION);

Review comment:
       In this case, the client has send TxnOffsetCommit before the partition has been added to the transaction, which is an illegal state transition. Could we throw an assertion error or something directly to cause the test to fail?

##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionEventSimulationTest.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * This test tries to test out the EOS robustness on the client side. It features a {@link TransactionSimulationCoordinator}
+ * which handles the incoming transactional produce/metadata requests and gives basic feedback through {@link MockClient}.
+ *
+ * Each iteration the transaction manager will append one record through accumulator and commit offset at the same time. The request
+ * being transmitted is not guaranteed to be processed or processed correctly, so a state checking loop is enforced to make the client
+ * and the coordinator interact with each other and ensure the state could be eventually clean using {@link TransactionManager#isReady}.
+ * By the end of the test we will check whether all the committed transactions are successfully materialized on the coordinator side.
+ *
+ * Features supported:
+ * 
+ * 1. Randomly abort transaction
+ * 2. Fault injection on response
+ * 3. Random message drop
+ */
+public class TransactionEventSimulationTest {
+
+    private TransactionManager transactionManager;
+    private TransactionSimulationCoordinator transactionCoordinator;
+    private Sender sender;
+    private final LogContext logContext = new LogContext();
+
+    private final MockTime time = new MockTime();
+    private final int requestTimeoutMs = 100;
+    private final int retryBackOffMs = 0;
+    private final long apiVersion = 0L;
+
+    private ProducerMetadata metadata = new ProducerMetadata(0, Long.MAX_VALUE, 10,
+        new LogContext(), new ClusterResourceListeners(), time);

Review comment:
       nit: use `logContext

##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionEventSimulationTest.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * This test tries to test out the EOS robustness on the client side. It features a {@link TransactionSimulationCoordinator}
+ * which handles the incoming transactional produce/metadata requests and gives basic feedback through {@link MockClient}.
+ *
+ * Each iteration the transaction manager will append one record through accumulator and commit offset at the same time. The request
+ * being transmitted is not guaranteed to be processed or processed correctly, so a state checking loop is enforced to make the client
+ * and the coordinator interact with each other and ensure the state could be eventually clean using {@link TransactionManager#isReady}.
+ * By the end of the test we will check whether all the committed transactions are successfully materialized on the coordinator side.
+ *
+ * Features supported:
+ * 
+ * 1. Randomly abort transaction
+ * 2. Fault injection on response
+ * 3. Random message drop
+ */
+public class TransactionEventSimulationTest {
+
+    private TransactionManager transactionManager;
+    private TransactionSimulationCoordinator transactionCoordinator;
+    private Sender sender;
+    private final LogContext logContext = new LogContext();
+
+    private final MockTime time = new MockTime();
+    private final int requestTimeoutMs = 100;
+    private final int retryBackOffMs = 0;
+    private final long apiVersion = 0L;
+
+    private ProducerMetadata metadata = new ProducerMetadata(0, Long.MAX_VALUE, 10,
+        new LogContext(), new ClusterResourceListeners(), time);
+    private MockClient client = new MockClient(time, metadata);
+
+    @Before
+    public void setup() {
+        transactionManager = new TransactionManager(logContext, "txn-id",
+            requestTimeoutMs, apiVersion, new ApiVersions(), false);
+        transactionCoordinator = new TransactionSimulationCoordinator(client);
+    }
+
+    @Test
+    public void simulateTxnEvents() throws InterruptedException {
+        final int batchSize = 100;
+        final int lingerMs = 0;
+        final int deliveryTimeoutMs = 10;
+
+        RecordAccumulator accumulator = new RecordAccumulator(logContext, batchSize, CompressionType.GZIP,
+            lingerMs, retryBackOffMs, deliveryTimeoutMs, new Metrics(), "accumulator", time, new ApiVersions(), transactionManager,
+            new BufferPool(1000, 100, new Metrics(), time, "producer-internal-metrics"));
+
+        metadata.add("topic", time.milliseconds());
+        metadata.update(metadata.newMetadataRequestAndVersion(time.milliseconds()).requestVersion,
+            TestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 2)), true, time.milliseconds());
+
+        sender = new Sender(logContext, client, metadata, accumulator, false, 100, (short) 1,
+            Integer.MAX_VALUE, new SenderMetricsRegistry(new Metrics()), time, requestTimeoutMs, 10, transactionManager, new ApiVersions());
+
+        transactionManager.initializeTransactions();
+        sender.runOnce();
+        resolvePendingRequests();
+        final int numTransactions = 100;
+
+        TopicPartition key = new TopicPartition("topic", 0);
+        long committedOffsets = 0L;
+        Random abortTxn = new Random();
+        client.prepareMetadataUpdate(TestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 2)));
+        final long timestamp = 0L;
+        final int maxBlockTime = 0;
+
+        for (int i = 0; i < numTransactions; i++) {
+            transactionManager.beginTransaction();
+            transactionManager.maybeAddPartitionToTransaction(key);
+            accumulator.append(key, timestamp, new byte[1], new byte[1],
+                Record.EMPTY_HEADERS, null, maxBlockTime, false, time.milliseconds());
+            transactionManager.sendOffsetsToTransaction(
+                Collections.singletonMap(key, new OffsetAndMetadata(committedOffsets)),
+                new ConsumerGroupMetadata("group"));
+
+            if (abortTxn.nextBoolean()) {
+                transactionManager.beginCommit();
+                committedOffsets += 1;
+            } else {
+                transactionManager.beginAbort();
+            }
+
+            resolvePendingRequests();
+        }
+
+        assertTrue(transactionCoordinator.persistentPartitionData().containsKey(key));
+        assertTrue(transactionCoordinator.committedOffsets().containsKey(key));
+        assertEquals(committedOffsets - 1, (long) transactionCoordinator.committedOffsets().get(key));
+    }
+
+    private void resolvePendingRequests() {
+        Random dropMessageRandom = new Random();

Review comment:
       It's useful if the simulation test is deterministic. That way failures are easy to reproduce. Perhaps we can use a shared `Random` instance (between this class and the coordinator) with a defined seed.

##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionSimulationCoordinator.java
##########
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.AddOffsetsToTxnResponseData;
+import org.apache.kafka.common.message.EndTxnResponseData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.message.InitProducerIdResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.AddOffsetsToTxnRequest;
+import org.apache.kafka.common.requests.AddOffsetsToTxnResponse;
+import org.apache.kafka.common.requests.AddPartitionsToTxnRequest;
+import org.apache.kafka.common.requests.AddPartitionsToTxnResponse;
+import org.apache.kafka.common.requests.EndTxnRequest;
+import org.apache.kafka.common.requests.EndTxnResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.requests.InitProducerIdRequest;
+import org.apache.kafka.common.requests.InitProducerIdResponse;
+import org.apache.kafka.common.requests.ProduceRequest;
+import org.apache.kafka.common.requests.ProduceResponse;
+import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse;
+import org.apache.kafka.common.requests.TransactionResult;
+import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
+import org.apache.kafka.common.requests.TxnOffsetCommitResponse;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Random;
+
+import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_EPOCH;
+import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_ID;
+
+/**
+ * A reduced functionality of a combination of transaction coordinator and group coordinator.
+ * It provides basic event handling from {@link Sender} with transaction turned on.
+ *
+ * Random fault injection is supported as well, which will return a retriable error to
+ * the client.
+ */
+class TransactionSimulationCoordinator {
+
+    private final Map<TopicPartition, List<Record>> pendingPartitionData;
+    private final Map<TopicPartition, Long> pendingOffsets;
+    private boolean offsetsAddedToTxn = false;
+
+    private long currentProducerId = 0L;
+    private short currentEpoch = 0;
+    private Random faultInjectRandom = new Random();
+
+    public Map<TopicPartition, List<Record>> persistentPartitionData() {
+        return persistentPartitionData;
+    }
+
+    public Map<TopicPartition, Long> committedOffsets() {
+        return committedOffsets;
+    }
+
+    private final Map<TopicPartition, List<Record>> persistentPartitionData;
+    private final Map<TopicPartition, Long> committedOffsets;
+
+    private final MockClient networkClient;
+    private final int throttleTimeMs = 10;
+
+    TransactionSimulationCoordinator(MockClient networkClient) {
+        this.networkClient = networkClient;
+        this.pendingPartitionData = new HashMap<>();
+        this.pendingOffsets = new HashMap<>();
+        this.persistentPartitionData = new HashMap<>();
+        this.committedOffsets = new HashMap<>();
+    }
+
+    void runOnce(boolean dropMessage) {
+        Queue<ClientRequest> incomingRequests = networkClient.requests();
+
+        final boolean faultInject = faultInjectRandom.nextBoolean();

Review comment:
       Wondering if it would be more useful if we can control the faults more explicitly. For example, we could add a hook to make the coordinator temporarily unavailable and to restore it later.

##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionSimulationCoordinator.java
##########
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.AddOffsetsToTxnResponseData;
+import org.apache.kafka.common.message.EndTxnResponseData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.message.InitProducerIdResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.AddOffsetsToTxnRequest;
+import org.apache.kafka.common.requests.AddOffsetsToTxnResponse;
+import org.apache.kafka.common.requests.AddPartitionsToTxnRequest;
+import org.apache.kafka.common.requests.AddPartitionsToTxnResponse;
+import org.apache.kafka.common.requests.EndTxnRequest;
+import org.apache.kafka.common.requests.EndTxnResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.requests.InitProducerIdRequest;
+import org.apache.kafka.common.requests.InitProducerIdResponse;
+import org.apache.kafka.common.requests.ProduceRequest;
+import org.apache.kafka.common.requests.ProduceResponse;
+import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse;
+import org.apache.kafka.common.requests.TransactionResult;
+import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
+import org.apache.kafka.common.requests.TxnOffsetCommitResponse;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Random;
+
+import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_EPOCH;
+import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_ID;
+
+/**
+ * A reduced functionality of a combination of transaction coordinator and group coordinator.
+ * It provides basic event handling from {@link Sender} with transaction turned on.
+ *
+ * Random fault injection is supported as well, which will return a retriable error to
+ * the client.
+ */
+class TransactionSimulationCoordinator {
+
+    private final Map<TopicPartition, List<Record>> pendingPartitionData;
+    private final Map<TopicPartition, Long> pendingOffsets;
+    private boolean offsetsAddedToTxn = false;
+
+    private long currentProducerId = 0L;
+    private short currentEpoch = 0;
+    private Random faultInjectRandom = new Random();
+
+    public Map<TopicPartition, List<Record>> persistentPartitionData() {
+        return persistentPartitionData;
+    }
+
+    public Map<TopicPartition, Long> committedOffsets() {
+        return committedOffsets;
+    }
+
+    private final Map<TopicPartition, List<Record>> persistentPartitionData;
+    private final Map<TopicPartition, Long> committedOffsets;
+
+    private final MockClient networkClient;
+    private final int throttleTimeMs = 10;
+
+    TransactionSimulationCoordinator(MockClient networkClient) {
+        this.networkClient = networkClient;
+        this.pendingPartitionData = new HashMap<>();
+        this.pendingOffsets = new HashMap<>();
+        this.persistentPartitionData = new HashMap<>();
+        this.committedOffsets = new HashMap<>();
+    }
+
+    void runOnce(boolean dropMessage) {

Review comment:
       nit: maybe `disconnect` is a better name given actual behavior.

##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionSimulationCoordinator.java
##########
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.AddOffsetsToTxnResponseData;
+import org.apache.kafka.common.message.EndTxnResponseData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.message.InitProducerIdResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.AddOffsetsToTxnRequest;
+import org.apache.kafka.common.requests.AddOffsetsToTxnResponse;
+import org.apache.kafka.common.requests.AddPartitionsToTxnRequest;
+import org.apache.kafka.common.requests.AddPartitionsToTxnResponse;
+import org.apache.kafka.common.requests.EndTxnRequest;
+import org.apache.kafka.common.requests.EndTxnResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.requests.InitProducerIdRequest;
+import org.apache.kafka.common.requests.InitProducerIdResponse;
+import org.apache.kafka.common.requests.ProduceRequest;
+import org.apache.kafka.common.requests.ProduceResponse;
+import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse;
+import org.apache.kafka.common.requests.TransactionResult;
+import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
+import org.apache.kafka.common.requests.TxnOffsetCommitResponse;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Random;
+
+import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_EPOCH;
+import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_ID;
+
+/**
+ * A reduced functionality of a combination of transaction coordinator and group coordinator.
+ * It provides basic event handling from {@link Sender} with transaction turned on.
+ *
+ * Random fault injection is supported as well, which will return a retriable error to
+ * the client.
+ */
+class TransactionSimulationCoordinator {
+
+    private final Map<TopicPartition, List<Record>> pendingPartitionData;
+    private final Map<TopicPartition, Long> pendingOffsets;
+    private boolean offsetsAddedToTxn = false;
+
+    private long currentProducerId = 0L;
+    private short currentEpoch = 0;
+    private Random faultInjectRandom = new Random();
+
+    public Map<TopicPartition, List<Record>> persistentPartitionData() {
+        return persistentPartitionData;
+    }
+
+    public Map<TopicPartition, Long> committedOffsets() {
+        return committedOffsets;
+    }
+
+    private final Map<TopicPartition, List<Record>> persistentPartitionData;
+    private final Map<TopicPartition, Long> committedOffsets;
+
+    private final MockClient networkClient;
+    private final int throttleTimeMs = 10;
+
+    TransactionSimulationCoordinator(MockClient networkClient) {
+        this.networkClient = networkClient;
+        this.pendingPartitionData = new HashMap<>();
+        this.pendingOffsets = new HashMap<>();
+        this.persistentPartitionData = new HashMap<>();
+        this.committedOffsets = new HashMap<>();
+    }
+
+    void runOnce(boolean dropMessage) {
+        Queue<ClientRequest> incomingRequests = networkClient.requests();
+
+        final boolean faultInject = faultInjectRandom.nextBoolean();
+
+        if (incomingRequests.peek() == null) {
+            return;
+        }
+
+        final AbstractResponse response;
+        AbstractRequest nextRequest = incomingRequests.peek().requestBuilder().build();
+        if (nextRequest instanceof FindCoordinatorRequest) {
+            response = handleFindCoordinator(faultInject);
+        } else if (nextRequest instanceof InitProducerIdRequest) {
+            response = handleInitProducerId((InitProducerIdRequest) nextRequest, faultInject);
+        } else if (nextRequest instanceof AddPartitionsToTxnRequest) {
+            response = handleAddPartitionToTxn((AddPartitionsToTxnRequest) nextRequest, faultInject);
+        } else if (nextRequest instanceof AddOffsetsToTxnRequest) {
+            response = handleAddOffsetsToTxn((AddOffsetsToTxnRequest) nextRequest, faultInject);
+        } else if (nextRequest instanceof TxnOffsetCommitRequest) {
+            response = handleTxnCommit((TxnOffsetCommitRequest) nextRequest, faultInject);
+        } else if (nextRequest instanceof ProduceRequest) {
+            response = handleProduce((ProduceRequest) nextRequest, faultInject);
+        } else if (nextRequest instanceof EndTxnRequest) {
+            response = handleEndTxn((EndTxnRequest) nextRequest, faultInject);
+        } else {
+            throw new IllegalArgumentException("Unknown request: " + nextRequest);
+        }
+
+        networkClient.respond(response, dropMessage);
+    }
+
+    private FindCoordinatorResponse handleFindCoordinator(final boolean faultInject) {
+        if (faultInject) {
+            return new FindCoordinatorResponse(
+                new FindCoordinatorResponseData()
+                    .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()));
+        } else {
+            return new FindCoordinatorResponse(
+                new FindCoordinatorResponseData()
+                    .setErrorCode(Errors.NONE.code())
+                    .setHost("localhost")
+                    .setNodeId(0)
+                    .setPort(2211)
+            );
+        }
+    }
+
+    private InitProducerIdResponse handleInitProducerId(InitProducerIdRequest request,
+                                                        final boolean faultInject) {
+        if (faultInject) {
+            return new InitProducerIdResponse(
+                new InitProducerIdResponseData()
+                    .setErrorCode(Errors.NOT_COORDINATOR.code())
+            );
+        } else if (request.data.producerId() != NO_PRODUCER_ID &&
+                    request.data.producerId() != currentProducerId) {
+            return new InitProducerIdResponse(
+                new InitProducerIdResponseData()
+                    .setErrorCode(Errors.UNKNOWN_PRODUCER_ID.code())
+                    .setThrottleTimeMs(throttleTimeMs)
+            );
+        } else if (request.data.producerEpoch() != NO_PRODUCER_EPOCH &&
+                    request.data.producerEpoch() != currentEpoch) {
+            return new InitProducerIdResponse(
+                new InitProducerIdResponseData()
+                    .setErrorCode(Errors.INVALID_PRODUCER_EPOCH.code())
+                    .setThrottleTimeMs(throttleTimeMs)
+            );
+        } else {
+            currentProducerId += 1;
+            currentEpoch += 1;
+            return new InitProducerIdResponse(
+                new InitProducerIdResponseData()
+                    .setProducerId(currentProducerId)
+                    .setProducerEpoch(currentEpoch)
+                    .setErrorCode(Errors.NONE.code())
+            );
+        }
+    }
+
+    private AddPartitionsToTxnResponse handleAddPartitionToTxn(AddPartitionsToTxnRequest request,
+                                                               final boolean faultInject) {
+        Map<TopicPartition, Errors> errors = new HashMap<>();
+        request.partitions().forEach(topicPartition -> {
+            if (faultInject) {
+                errors.put(topicPartition, Errors.COORDINATOR_NOT_AVAILABLE);
+            } else if (request.data.producerId() != currentProducerId) {
+                errors.put(topicPartition, Errors.UNKNOWN_PRODUCER_ID);
+            } else if (request.data.producerEpoch() != currentEpoch) {
+                errors.put(topicPartition, Errors.INVALID_PRODUCER_EPOCH);
+            } else {
+                errors.put(topicPartition, Errors.NONE);
+            }
+        });
+
+        return new AddPartitionsToTxnResponse(
+            throttleTimeMs,
+            errors
+        );
+    }
+
+    private AddOffsetsToTxnResponse handleAddOffsetsToTxn(AddOffsetsToTxnRequest request,
+                                                          final boolean faultInject) {
+        if (faultInject) {
+            return new AddOffsetsToTxnResponse(
+                new AddOffsetsToTxnResponseData()
+                    .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code())
+                    .setThrottleTimeMs(throttleTimeMs)
+            );
+        } else if (request.data.producerId() != currentProducerId) {
+            return new AddOffsetsToTxnResponse(
+                new AddOffsetsToTxnResponseData()
+                    .setErrorCode(Errors.UNKNOWN_PRODUCER_ID.code())
+                    .setThrottleTimeMs(throttleTimeMs)
+            );
+        } else if (request.data.producerEpoch() != currentEpoch) {
+            return new AddOffsetsToTxnResponse(
+                new AddOffsetsToTxnResponseData()
+                    .setErrorCode(Errors.INVALID_PRODUCER_EPOCH.code())
+                    .setThrottleTimeMs(throttleTimeMs)
+            );
+        } else {
+            offsetsAddedToTxn = true;
+            return new AddOffsetsToTxnResponse(
+                new AddOffsetsToTxnResponseData()
+                    .setErrorCode(Errors.NONE.code())
+                    .setThrottleTimeMs(throttleTimeMs)
+            );
+        }
+    }
+
+    private AbstractResponse handleTxnCommit(TxnOffsetCommitRequest request,
+                                             final boolean faultInject) {
+        Map<TopicPartition, Errors> errors = new HashMap<>();
+        request.data.topics().forEach(topic -> topic.partitions().forEach(partition -> {
+            TopicPartition key = new TopicPartition(topic.name(), partition.partitionIndex());
+            if (faultInject) {
+                errors.put(key, Errors.COORDINATOR_LOAD_IN_PROGRESS);
+            } else if (request.data.producerId() != currentProducerId) {
+                errors.put(key, Errors.UNKNOWN_PRODUCER_ID);
+            } else if (request.data.producerEpoch() != currentEpoch) {
+                errors.put(key, Errors.INVALID_PRODUCER_EPOCH);
+            } else if (offsetsAddedToTxn) {
+                pendingOffsets.put(key, partition.committedOffset());
+                errors.put(key, Errors.NONE);
+            } else {
+                errors.put(key, Errors.UNKNOWN_TOPIC_OR_PARTITION);
+            }
+        }));
+
+        return new TxnOffsetCommitResponse(
+            throttleTimeMs,
+            errors
+        );
+    }
+
+    private AbstractResponse handleProduce(ProduceRequest request,

Review comment:
       To make this really interesting, we would need to add some sequence number bookkeeping. Really its the sequence/epoch bookkeeping which makes the implementation so complex.

##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionSimulationCoordinator.java
##########
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.AddOffsetsToTxnResponseData;
+import org.apache.kafka.common.message.EndTxnResponseData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.message.InitProducerIdResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.AddOffsetsToTxnRequest;
+import org.apache.kafka.common.requests.AddOffsetsToTxnResponse;
+import org.apache.kafka.common.requests.AddPartitionsToTxnRequest;
+import org.apache.kafka.common.requests.AddPartitionsToTxnResponse;
+import org.apache.kafka.common.requests.EndTxnRequest;
+import org.apache.kafka.common.requests.EndTxnResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.requests.InitProducerIdRequest;
+import org.apache.kafka.common.requests.InitProducerIdResponse;
+import org.apache.kafka.common.requests.ProduceRequest;
+import org.apache.kafka.common.requests.ProduceResponse;
+import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse;
+import org.apache.kafka.common.requests.TransactionResult;
+import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
+import org.apache.kafka.common.requests.TxnOffsetCommitResponse;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Random;
+
+import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_EPOCH;
+import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_ID;
+
+/**
+ * A reduced functionality of a combination of transaction coordinator and group coordinator.
+ * It provides basic event handling from {@link Sender} with transaction turned on.
+ *
+ * Random fault injection is supported as well, which will return a retriable error to
+ * the client.
+ */
+class TransactionSimulationCoordinator {
+
+    private final Map<TopicPartition, List<Record>> pendingPartitionData;
+    private final Map<TopicPartition, Long> pendingOffsets;
+    private boolean offsetsAddedToTxn = false;
+
+    private long currentProducerId = 0L;
+    private short currentEpoch = 0;
+    private Random faultInjectRandom = new Random();
+
+    public Map<TopicPartition, List<Record>> persistentPartitionData() {
+        return persistentPartitionData;
+    }
+
+    public Map<TopicPartition, Long> committedOffsets() {
+        return committedOffsets;
+    }
+
+    private final Map<TopicPartition, List<Record>> persistentPartitionData;
+    private final Map<TopicPartition, Long> committedOffsets;
+
+    private final MockClient networkClient;
+    private final int throttleTimeMs = 10;
+
+    TransactionSimulationCoordinator(MockClient networkClient) {
+        this.networkClient = networkClient;
+        this.pendingPartitionData = new HashMap<>();
+        this.pendingOffsets = new HashMap<>();
+        this.persistentPartitionData = new HashMap<>();
+        this.committedOffsets = new HashMap<>();
+    }
+
+    void runOnce(boolean dropMessage) {
+        Queue<ClientRequest> incomingRequests = networkClient.requests();
+
+        final boolean faultInject = faultInjectRandom.nextBoolean();
+
+        if (incomingRequests.peek() == null) {
+            return;
+        }
+
+        final AbstractResponse response;
+        AbstractRequest nextRequest = incomingRequests.peek().requestBuilder().build();
+        if (nextRequest instanceof FindCoordinatorRequest) {
+            response = handleFindCoordinator(faultInject);
+        } else if (nextRequest instanceof InitProducerIdRequest) {
+            response = handleInitProducerId((InitProducerIdRequest) nextRequest, faultInject);
+        } else if (nextRequest instanceof AddPartitionsToTxnRequest) {
+            response = handleAddPartitionToTxn((AddPartitionsToTxnRequest) nextRequest, faultInject);
+        } else if (nextRequest instanceof AddOffsetsToTxnRequest) {
+            response = handleAddOffsetsToTxn((AddOffsetsToTxnRequest) nextRequest, faultInject);
+        } else if (nextRequest instanceof TxnOffsetCommitRequest) {
+            response = handleTxnCommit((TxnOffsetCommitRequest) nextRequest, faultInject);
+        } else if (nextRequest instanceof ProduceRequest) {
+            response = handleProduce((ProduceRequest) nextRequest, faultInject);
+        } else if (nextRequest instanceof EndTxnRequest) {
+            response = handleEndTxn((EndTxnRequest) nextRequest, faultInject);
+        } else {
+            throw new IllegalArgumentException("Unknown request: " + nextRequest);
+        }
+
+        networkClient.respond(response, dropMessage);
+    }
+
+    private FindCoordinatorResponse handleFindCoordinator(final boolean faultInject) {
+        if (faultInject) {
+            return new FindCoordinatorResponse(
+                new FindCoordinatorResponseData()
+                    .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()));
+        } else {
+            return new FindCoordinatorResponse(
+                new FindCoordinatorResponseData()
+                    .setErrorCode(Errors.NONE.code())
+                    .setHost("localhost")
+                    .setNodeId(0)
+                    .setPort(2211)
+            );
+        }
+    }
+
+    private InitProducerIdResponse handleInitProducerId(InitProducerIdRequest request,
+                                                        final boolean faultInject) {
+        if (faultInject) {
+            return new InitProducerIdResponse(
+                new InitProducerIdResponseData()
+                    .setErrorCode(Errors.NOT_COORDINATOR.code())
+            );
+        } else if (request.data.producerId() != NO_PRODUCER_ID &&
+                    request.data.producerId() != currentProducerId) {
+            return new InitProducerIdResponse(
+                new InitProducerIdResponseData()
+                    .setErrorCode(Errors.UNKNOWN_PRODUCER_ID.code())
+                    .setThrottleTimeMs(throttleTimeMs)
+            );
+        } else if (request.data.producerEpoch() != NO_PRODUCER_EPOCH &&
+                    request.data.producerEpoch() != currentEpoch) {
+            return new InitProducerIdResponse(
+                new InitProducerIdResponseData()
+                    .setErrorCode(Errors.INVALID_PRODUCER_EPOCH.code())
+                    .setThrottleTimeMs(throttleTimeMs)
+            );
+        } else {
+            currentProducerId += 1;
+            currentEpoch += 1;
+            return new InitProducerIdResponse(
+                new InitProducerIdResponseData()
+                    .setProducerId(currentProducerId)
+                    .setProducerEpoch(currentEpoch)
+                    .setErrorCode(Errors.NONE.code())
+            );
+        }
+    }
+
+    private AddPartitionsToTxnResponse handleAddPartitionToTxn(AddPartitionsToTxnRequest request,
+                                                               final boolean faultInject) {
+        Map<TopicPartition, Errors> errors = new HashMap<>();
+        request.partitions().forEach(topicPartition -> {
+            if (faultInject) {
+                errors.put(topicPartition, Errors.COORDINATOR_NOT_AVAILABLE);
+            } else if (request.data.producerId() != currentProducerId) {
+                errors.put(topicPartition, Errors.UNKNOWN_PRODUCER_ID);
+            } else if (request.data.producerEpoch() != currentEpoch) {
+                errors.put(topicPartition, Errors.INVALID_PRODUCER_EPOCH);
+            } else {
+                errors.put(topicPartition, Errors.NONE);
+            }
+        });
+
+        return new AddPartitionsToTxnResponse(
+            throttleTimeMs,
+            errors
+        );
+    }
+
+    private AddOffsetsToTxnResponse handleAddOffsetsToTxn(AddOffsetsToTxnRequest request,
+                                                          final boolean faultInject) {
+        if (faultInject) {
+            return new AddOffsetsToTxnResponse(
+                new AddOffsetsToTxnResponseData()
+                    .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code())
+                    .setThrottleTimeMs(throttleTimeMs)
+            );
+        } else if (request.data.producerId() != currentProducerId) {
+            return new AddOffsetsToTxnResponse(
+                new AddOffsetsToTxnResponseData()
+                    .setErrorCode(Errors.UNKNOWN_PRODUCER_ID.code())
+                    .setThrottleTimeMs(throttleTimeMs)
+            );
+        } else if (request.data.producerEpoch() != currentEpoch) {
+            return new AddOffsetsToTxnResponse(
+                new AddOffsetsToTxnResponseData()
+                    .setErrorCode(Errors.INVALID_PRODUCER_EPOCH.code())
+                    .setThrottleTimeMs(throttleTimeMs)
+            );
+        } else {
+            offsetsAddedToTxn = true;
+            return new AddOffsetsToTxnResponse(
+                new AddOffsetsToTxnResponseData()
+                    .setErrorCode(Errors.NONE.code())
+                    .setThrottleTimeMs(throttleTimeMs)
+            );
+        }
+    }
+
+    private AbstractResponse handleTxnCommit(TxnOffsetCommitRequest request,
+                                             final boolean faultInject) {
+        Map<TopicPartition, Errors> errors = new HashMap<>();
+        request.data.topics().forEach(topic -> topic.partitions().forEach(partition -> {
+            TopicPartition key = new TopicPartition(topic.name(), partition.partitionIndex());
+            if (faultInject) {
+                errors.put(key, Errors.COORDINATOR_LOAD_IN_PROGRESS);
+            } else if (request.data.producerId() != currentProducerId) {
+                errors.put(key, Errors.UNKNOWN_PRODUCER_ID);
+            } else if (request.data.producerEpoch() != currentEpoch) {
+                errors.put(key, Errors.INVALID_PRODUCER_EPOCH);
+            } else if (offsetsAddedToTxn) {
+                pendingOffsets.put(key, partition.committedOffset());
+                errors.put(key, Errors.NONE);
+            } else {
+                errors.put(key, Errors.UNKNOWN_TOPIC_OR_PARTITION);
+            }
+        }));
+
+        return new TxnOffsetCommitResponse(
+            throttleTimeMs,
+            errors
+        );
+    }
+
+    private AbstractResponse handleProduce(ProduceRequest request,
+                                           final boolean faultInject) {
+        Map<TopicPartition, PartitionResponse> errors = new HashMap<>();
+        Map<TopicPartition, MemoryRecords> partitionRecords = request.partitionRecordsOrFail();
+
+        partitionRecords.forEach((topicPartition, records) -> {
+            if (faultInject) {
+                // Trigger KIP-360 path.
+                errors.put(topicPartition, new PartitionResponse(Errors.UNKNOWN_PRODUCER_ID));
+            } else {
+                List<Record> sentRecords = pendingPartitionData.getOrDefault(topicPartition, new ArrayList<>());

Review comment:
       Similar to the offset commit path, it would be useful to validate here that each partition that was written to was first added to the transaction properly.

##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionSimulationCoordinator.java
##########
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.AddOffsetsToTxnResponseData;
+import org.apache.kafka.common.message.EndTxnResponseData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.message.InitProducerIdResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.AddOffsetsToTxnRequest;
+import org.apache.kafka.common.requests.AddOffsetsToTxnResponse;
+import org.apache.kafka.common.requests.AddPartitionsToTxnRequest;
+import org.apache.kafka.common.requests.AddPartitionsToTxnResponse;
+import org.apache.kafka.common.requests.EndTxnRequest;
+import org.apache.kafka.common.requests.EndTxnResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.requests.InitProducerIdRequest;
+import org.apache.kafka.common.requests.InitProducerIdResponse;
+import org.apache.kafka.common.requests.ProduceRequest;
+import org.apache.kafka.common.requests.ProduceResponse;
+import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse;
+import org.apache.kafka.common.requests.TransactionResult;
+import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
+import org.apache.kafka.common.requests.TxnOffsetCommitResponse;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Random;
+
+import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_EPOCH;
+import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_ID;
+
+/**
+ * A reduced functionality of a combination of transaction coordinator and group coordinator.
+ * It provides basic event handling from {@link Sender} with transaction turned on.
+ *
+ * Random fault injection is supported as well, which will return a retriable error to
+ * the client.
+ */
+class TransactionSimulationCoordinator {
+
+    private final Map<TopicPartition, List<Record>> pendingPartitionData;
+    private final Map<TopicPartition, Long> pendingOffsets;
+    private boolean offsetsAddedToTxn = false;
+
+    private long currentProducerId = 0L;
+    private short currentEpoch = 0;
+    private Random faultInjectRandom = new Random();
+
+    public Map<TopicPartition, List<Record>> persistentPartitionData() {
+        return persistentPartitionData;
+    }
+
+    public Map<TopicPartition, Long> committedOffsets() {
+        return committedOffsets;
+    }
+
+    private final Map<TopicPartition, List<Record>> persistentPartitionData;
+    private final Map<TopicPartition, Long> committedOffsets;
+
+    private final MockClient networkClient;
+    private final int throttleTimeMs = 10;
+
+    TransactionSimulationCoordinator(MockClient networkClient) {
+        this.networkClient = networkClient;
+        this.pendingPartitionData = new HashMap<>();
+        this.pendingOffsets = new HashMap<>();
+        this.persistentPartitionData = new HashMap<>();
+        this.committedOffsets = new HashMap<>();
+    }
+
+    void runOnce(boolean dropMessage) {
+        Queue<ClientRequest> incomingRequests = networkClient.requests();
+
+        final boolean faultInject = faultInjectRandom.nextBoolean();
+
+        if (incomingRequests.peek() == null) {
+            return;
+        }
+
+        final AbstractResponse response;
+        AbstractRequest nextRequest = incomingRequests.peek().requestBuilder().build();
+        if (nextRequest instanceof FindCoordinatorRequest) {
+            response = handleFindCoordinator(faultInject);
+        } else if (nextRequest instanceof InitProducerIdRequest) {
+            response = handleInitProducerId((InitProducerIdRequest) nextRequest, faultInject);
+        } else if (nextRequest instanceof AddPartitionsToTxnRequest) {
+            response = handleAddPartitionToTxn((AddPartitionsToTxnRequest) nextRequest, faultInject);
+        } else if (nextRequest instanceof AddOffsetsToTxnRequest) {
+            response = handleAddOffsetsToTxn((AddOffsetsToTxnRequest) nextRequest, faultInject);
+        } else if (nextRequest instanceof TxnOffsetCommitRequest) {
+            response = handleTxnCommit((TxnOffsetCommitRequest) nextRequest, faultInject);
+        } else if (nextRequest instanceof ProduceRequest) {
+            response = handleProduce((ProduceRequest) nextRequest, faultInject);
+        } else if (nextRequest instanceof EndTxnRequest) {
+            response = handleEndTxn((EndTxnRequest) nextRequest, faultInject);
+        } else {
+            throw new IllegalArgumentException("Unknown request: " + nextRequest);
+        }
+
+        networkClient.respond(response, dropMessage);
+    }
+
+    private FindCoordinatorResponse handleFindCoordinator(final boolean faultInject) {
+        if (faultInject) {
+            return new FindCoordinatorResponse(
+                new FindCoordinatorResponseData()
+                    .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()));
+        } else {
+            return new FindCoordinatorResponse(
+                new FindCoordinatorResponseData()
+                    .setErrorCode(Errors.NONE.code())
+                    .setHost("localhost")
+                    .setNodeId(0)
+                    .setPort(2211)
+            );
+        }
+    }
+
+    private InitProducerIdResponse handleInitProducerId(InitProducerIdRequest request,
+                                                        final boolean faultInject) {
+        if (faultInject) {
+            return new InitProducerIdResponse(
+                new InitProducerIdResponseData()
+                    .setErrorCode(Errors.NOT_COORDINATOR.code())
+            );
+        } else if (request.data.producerId() != NO_PRODUCER_ID &&
+                    request.data.producerId() != currentProducerId) {
+            return new InitProducerIdResponse(
+                new InitProducerIdResponseData()
+                    .setErrorCode(Errors.UNKNOWN_PRODUCER_ID.code())
+                    .setThrottleTimeMs(throttleTimeMs)
+            );
+        } else if (request.data.producerEpoch() != NO_PRODUCER_EPOCH &&
+                    request.data.producerEpoch() != currentEpoch) {
+            return new InitProducerIdResponse(
+                new InitProducerIdResponseData()
+                    .setErrorCode(Errors.INVALID_PRODUCER_EPOCH.code())
+                    .setThrottleTimeMs(throttleTimeMs)
+            );
+        } else {
+            currentProducerId += 1;
+            currentEpoch += 1;
+            return new InitProducerIdResponse(
+                new InitProducerIdResponseData()
+                    .setProducerId(currentProducerId)
+                    .setProducerEpoch(currentEpoch)
+                    .setErrorCode(Errors.NONE.code())
+            );
+        }
+    }
+
+    private AddPartitionsToTxnResponse handleAddPartitionToTxn(AddPartitionsToTxnRequest request,
+                                                               final boolean faultInject) {
+        Map<TopicPartition, Errors> errors = new HashMap<>();
+        request.partitions().forEach(topicPartition -> {
+            if (faultInject) {
+                errors.put(topicPartition, Errors.COORDINATOR_NOT_AVAILABLE);
+            } else if (request.data.producerId() != currentProducerId) {
+                errors.put(topicPartition, Errors.UNKNOWN_PRODUCER_ID);
+            } else if (request.data.producerEpoch() != currentEpoch) {
+                errors.put(topicPartition, Errors.INVALID_PRODUCER_EPOCH);
+            } else {
+                errors.put(topicPartition, Errors.NONE);
+            }
+        });
+
+        return new AddPartitionsToTxnResponse(
+            throttleTimeMs,
+            errors
+        );
+    }
+
+    private AddOffsetsToTxnResponse handleAddOffsetsToTxn(AddOffsetsToTxnRequest request,
+                                                          final boolean faultInject) {
+        if (faultInject) {
+            return new AddOffsetsToTxnResponse(
+                new AddOffsetsToTxnResponseData()
+                    .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code())
+                    .setThrottleTimeMs(throttleTimeMs)
+            );
+        } else if (request.data.producerId() != currentProducerId) {
+            return new AddOffsetsToTxnResponse(
+                new AddOffsetsToTxnResponseData()
+                    .setErrorCode(Errors.UNKNOWN_PRODUCER_ID.code())
+                    .setThrottleTimeMs(throttleTimeMs)
+            );
+        } else if (request.data.producerEpoch() != currentEpoch) {
+            return new AddOffsetsToTxnResponse(
+                new AddOffsetsToTxnResponseData()
+                    .setErrorCode(Errors.INVALID_PRODUCER_EPOCH.code())
+                    .setThrottleTimeMs(throttleTimeMs)
+            );
+        } else {
+            offsetsAddedToTxn = true;
+            return new AddOffsetsToTxnResponse(
+                new AddOffsetsToTxnResponseData()
+                    .setErrorCode(Errors.NONE.code())
+                    .setThrottleTimeMs(throttleTimeMs)
+            );
+        }
+    }
+
+    private AbstractResponse handleTxnCommit(TxnOffsetCommitRequest request,
+                                             final boolean faultInject) {
+        Map<TopicPartition, Errors> errors = new HashMap<>();
+        request.data.topics().forEach(topic -> topic.partitions().forEach(partition -> {
+            TopicPartition key = new TopicPartition(topic.name(), partition.partitionIndex());
+            if (faultInject) {
+                errors.put(key, Errors.COORDINATOR_LOAD_IN_PROGRESS);
+            } else if (request.data.producerId() != currentProducerId) {
+                errors.put(key, Errors.UNKNOWN_PRODUCER_ID);
+            } else if (request.data.producerEpoch() != currentEpoch) {
+                errors.put(key, Errors.INVALID_PRODUCER_EPOCH);
+            } else if (offsetsAddedToTxn) {
+                pendingOffsets.put(key, partition.committedOffset());
+                errors.put(key, Errors.NONE);
+            } else {
+                errors.put(key, Errors.UNKNOWN_TOPIC_OR_PARTITION);
+            }
+        }));
+
+        return new TxnOffsetCommitResponse(
+            throttleTimeMs,
+            errors
+        );
+    }
+
+    private AbstractResponse handleProduce(ProduceRequest request,
+                                           final boolean faultInject) {
+        Map<TopicPartition, PartitionResponse> errors = new HashMap<>();
+        Map<TopicPartition, MemoryRecords> partitionRecords = request.partitionRecordsOrFail();
+
+        partitionRecords.forEach((topicPartition, records) -> {
+            if (faultInject) {
+                // Trigger KIP-360 path.
+                errors.put(topicPartition, new PartitionResponse(Errors.UNKNOWN_PRODUCER_ID));
+            } else {
+                List<Record> sentRecords = pendingPartitionData.getOrDefault(topicPartition, new ArrayList<>());
+                for (Record partitionRecord  : records.records()) {
+                    sentRecords.add(partitionRecord);
+                }
+
+                pendingPartitionData.put(topicPartition, sentRecords);
+                errors.put(topicPartition, new PartitionResponse(Errors.NONE));
+            }
+        });
+
+        return new ProduceResponse(errors, throttleTimeMs);
+    }
+
+    private EndTxnResponse handleEndTxn(EndTxnRequest request, final boolean faultInject) {

Review comment:
       Another class of failure that we can simulate is when a request reaches the broker and gets handled, but the connection is lost before the response is sent.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #8725: KAFKA-9608: Transaction Event Simulation Test

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #8725:
URL: https://github.com/apache/kafka/pull/8725#discussion_r491150125



##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionSimulationCoordinator.java
##########
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.AddOffsetsToTxnResponseData;
+import org.apache.kafka.common.message.EndTxnResponseData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.message.InitProducerIdResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.AddOffsetsToTxnRequest;
+import org.apache.kafka.common.requests.AddOffsetsToTxnResponse;
+import org.apache.kafka.common.requests.AddPartitionsToTxnRequest;
+import org.apache.kafka.common.requests.AddPartitionsToTxnResponse;
+import org.apache.kafka.common.requests.EndTxnRequest;
+import org.apache.kafka.common.requests.EndTxnResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.requests.InitProducerIdRequest;
+import org.apache.kafka.common.requests.InitProducerIdResponse;
+import org.apache.kafka.common.requests.ProduceRequest;
+import org.apache.kafka.common.requests.ProduceResponse;
+import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse;
+import org.apache.kafka.common.requests.TransactionResult;
+import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
+import org.apache.kafka.common.requests.TxnOffsetCommitResponse;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Random;
+
+import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_EPOCH;
+import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_ID;
+
+/**
+ * A reduced functionality of a combination of transaction coordinator and group coordinator.
+ * It provides basic event handling from {@link Sender} with transaction turned on.
+ *
+ * Random fault injection is supported as well, which will return a retriable error to
+ * the client.
+ */
+class TransactionSimulationCoordinator {
+
+    private final Map<TopicPartition, List<Record>> pendingPartitionData;
+    private final Map<TopicPartition, Long> pendingOffsets;
+    private boolean offsetsAddedToTxn = false;
+
+    private long currentProducerId = 0L;
+    private short currentEpoch = 0;
+    private Random faultInjectRandom = new Random();
+
+    public Map<TopicPartition, List<Record>> persistentPartitionData() {
+        return persistentPartitionData;
+    }
+
+    public Map<TopicPartition, Long> committedOffsets() {
+        return committedOffsets;
+    }
+
+    private final Map<TopicPartition, List<Record>> persistentPartitionData;
+    private final Map<TopicPartition, Long> committedOffsets;
+
+    private final MockClient networkClient;
+    private final int throttleTimeMs = 10;
+
+    TransactionSimulationCoordinator(MockClient networkClient) {
+        this.networkClient = networkClient;
+        this.pendingPartitionData = new HashMap<>();
+        this.pendingOffsets = new HashMap<>();
+        this.persistentPartitionData = new HashMap<>();
+        this.committedOffsets = new HashMap<>();
+    }
+
+    void runOnce(boolean dropMessage) {
+        Queue<ClientRequest> incomingRequests = networkClient.requests();
+
+        final boolean faultInject = faultInjectRandom.nextBoolean();
+
+        if (incomingRequests.peek() == null) {
+            return;
+        }
+
+        final AbstractResponse response;
+        AbstractRequest nextRequest = incomingRequests.peek().requestBuilder().build();
+        if (nextRequest instanceof FindCoordinatorRequest) {
+            response = handleFindCoordinator(faultInject);
+        } else if (nextRequest instanceof InitProducerIdRequest) {
+            response = handleInitProducerId((InitProducerIdRequest) nextRequest, faultInject);
+        } else if (nextRequest instanceof AddPartitionsToTxnRequest) {
+            response = handleAddPartitionToTxn((AddPartitionsToTxnRequest) nextRequest, faultInject);
+        } else if (nextRequest instanceof AddOffsetsToTxnRequest) {
+            response = handleAddOffsetsToTxn((AddOffsetsToTxnRequest) nextRequest, faultInject);
+        } else if (nextRequest instanceof TxnOffsetCommitRequest) {
+            response = handleTxnCommit((TxnOffsetCommitRequest) nextRequest, faultInject);
+        } else if (nextRequest instanceof ProduceRequest) {
+            response = handleProduce((ProduceRequest) nextRequest, faultInject);
+        } else if (nextRequest instanceof EndTxnRequest) {
+            response = handleEndTxn((EndTxnRequest) nextRequest, faultInject);
+        } else {
+            throw new IllegalArgumentException("Unknown request: " + nextRequest);
+        }
+
+        networkClient.respond(response, dropMessage);
+    }
+
+    private FindCoordinatorResponse handleFindCoordinator(final boolean faultInject) {
+        if (faultInject) {
+            return new FindCoordinatorResponse(
+                new FindCoordinatorResponseData()
+                    .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()));
+        } else {
+            return new FindCoordinatorResponse(
+                new FindCoordinatorResponseData()
+                    .setErrorCode(Errors.NONE.code())
+                    .setHost("localhost")
+                    .setNodeId(0)
+                    .setPort(2211)
+            );
+        }
+    }
+
+    private InitProducerIdResponse handleInitProducerId(InitProducerIdRequest request,
+                                                        final boolean faultInject) {
+        if (faultInject) {
+            return new InitProducerIdResponse(
+                new InitProducerIdResponseData()
+                    .setErrorCode(Errors.NOT_COORDINATOR.code())
+            );
+        } else if (request.data.producerId() != NO_PRODUCER_ID &&
+                    request.data.producerId() != currentProducerId) {
+            return new InitProducerIdResponse(
+                new InitProducerIdResponseData()
+                    .setErrorCode(Errors.UNKNOWN_PRODUCER_ID.code())
+                    .setThrottleTimeMs(throttleTimeMs)
+            );
+        } else if (request.data.producerEpoch() != NO_PRODUCER_EPOCH &&
+                    request.data.producerEpoch() != currentEpoch) {
+            return new InitProducerIdResponse(
+                new InitProducerIdResponseData()
+                    .setErrorCode(Errors.INVALID_PRODUCER_EPOCH.code())
+                    .setThrottleTimeMs(throttleTimeMs)
+            );
+        } else {
+            currentProducerId += 1;
+            currentEpoch += 1;
+            return new InitProducerIdResponse(
+                new InitProducerIdResponseData()
+                    .setProducerId(currentProducerId)
+                    .setProducerEpoch(currentEpoch)
+                    .setErrorCode(Errors.NONE.code())
+            );
+        }
+    }
+
+    private AddPartitionsToTxnResponse handleAddPartitionToTxn(AddPartitionsToTxnRequest request,
+                                                               final boolean faultInject) {
+        Map<TopicPartition, Errors> errors = new HashMap<>();
+        request.partitions().forEach(topicPartition -> {
+            if (faultInject) {
+                errors.put(topicPartition, Errors.COORDINATOR_NOT_AVAILABLE);
+            } else if (request.data.producerId() != currentProducerId) {
+                errors.put(topicPartition, Errors.UNKNOWN_PRODUCER_ID);
+            } else if (request.data.producerEpoch() != currentEpoch) {
+                errors.put(topicPartition, Errors.INVALID_PRODUCER_EPOCH);
+            } else {
+                errors.put(topicPartition, Errors.NONE);
+            }
+        });
+
+        return new AddPartitionsToTxnResponse(
+            throttleTimeMs,
+            errors
+        );
+    }
+
+    private AddOffsetsToTxnResponse handleAddOffsetsToTxn(AddOffsetsToTxnRequest request,
+                                                          final boolean faultInject) {
+        if (faultInject) {
+            return new AddOffsetsToTxnResponse(
+                new AddOffsetsToTxnResponseData()
+                    .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code())
+                    .setThrottleTimeMs(throttleTimeMs)
+            );
+        } else if (request.data.producerId() != currentProducerId) {
+            return new AddOffsetsToTxnResponse(
+                new AddOffsetsToTxnResponseData()
+                    .setErrorCode(Errors.UNKNOWN_PRODUCER_ID.code())
+                    .setThrottleTimeMs(throttleTimeMs)
+            );
+        } else if (request.data.producerEpoch() != currentEpoch) {
+            return new AddOffsetsToTxnResponse(
+                new AddOffsetsToTxnResponseData()
+                    .setErrorCode(Errors.INVALID_PRODUCER_EPOCH.code())
+                    .setThrottleTimeMs(throttleTimeMs)
+            );
+        } else {
+            offsetsAddedToTxn = true;
+            return new AddOffsetsToTxnResponse(
+                new AddOffsetsToTxnResponseData()
+                    .setErrorCode(Errors.NONE.code())
+                    .setThrottleTimeMs(throttleTimeMs)
+            );
+        }
+    }
+
+    private AbstractResponse handleTxnCommit(TxnOffsetCommitRequest request,
+                                             final boolean faultInject) {
+        Map<TopicPartition, Errors> errors = new HashMap<>();
+        request.data.topics().forEach(topic -> topic.partitions().forEach(partition -> {
+            TopicPartition key = new TopicPartition(topic.name(), partition.partitionIndex());
+            if (faultInject) {
+                errors.put(key, Errors.COORDINATOR_LOAD_IN_PROGRESS);
+            } else if (request.data.producerId() != currentProducerId) {
+                errors.put(key, Errors.UNKNOWN_PRODUCER_ID);
+            } else if (request.data.producerEpoch() != currentEpoch) {
+                errors.put(key, Errors.INVALID_PRODUCER_EPOCH);
+            } else if (offsetsAddedToTxn) {
+                pendingOffsets.put(key, partition.committedOffset());
+                errors.put(key, Errors.NONE);
+            } else {
+                errors.put(key, Errors.UNKNOWN_TOPIC_OR_PARTITION);
+            }
+        }));
+
+        return new TxnOffsetCommitResponse(
+            throttleTimeMs,
+            errors
+        );
+    }
+
+    private AbstractResponse handleProduce(ProduceRequest request,
+                                           final boolean faultInject) {
+        Map<TopicPartition, PartitionResponse> errors = new HashMap<>();
+        Map<TopicPartition, MemoryRecords> partitionRecords = request.partitionRecordsOrFail();
+
+        partitionRecords.forEach((topicPartition, records) -> {
+            if (faultInject) {
+                // Trigger KIP-360 path.
+                errors.put(topicPartition, new PartitionResponse(Errors.UNKNOWN_PRODUCER_ID));
+            } else {
+                List<Record> sentRecords = pendingPartitionData.getOrDefault(topicPartition, new ArrayList<>());

Review comment:
       made an attempt on that, but since we don't have a retry mechanism yet, it seems a bit hard to assume `AddPartition` to be successful before produce.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda closed pull request #8725: KAFKA-9608: Transaction Event Simulation Test

Posted by GitBox <gi...@apache.org>.
abbccdda closed pull request #8725:
URL: https://github.com/apache/kafka/pull/8725


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on pull request #8725: KAFKA-9608: Transaction Event Simulation Test

Posted by GitBox <gi...@apache.org>.
abbccdda commented on pull request #8725:
URL: https://github.com/apache/kafka/pull/8725#issuecomment-647731489


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #8725: KAFKA-9608: Transaction Event Simulation Test

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #8725:
URL: https://github.com/apache/kafka/pull/8725#discussion_r491066687



##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionEventSimulationTest.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * This test tries to test out the EOS robustness on the client side. It features a {@link TransactionSimulationCoordinator}
+ * which handles the incoming transactional produce/metadata requests and gives basic feedback through {@link MockClient}.
+ *
+ * Each iteration the transaction manager will append one record through accumulator and commit offset at the same time. The request
+ * being transmitted is not guaranteed to be processed or processed correctly, so a state checking loop is enforced to make the client
+ * and the coordinator interact with each other and ensure the state could be eventually clean using {@link TransactionManager#isReady}.
+ * By the end of the test we will check whether all the committed transactions are successfully materialized on the coordinator side.
+ *
+ * Features supported:
+ * 
+ * 1. Randomly abort transaction
+ * 2. Fault injection on response
+ * 3. Random message drop
+ */
+public class TransactionEventSimulationTest {
+
+    private TransactionManager transactionManager;
+    private TransactionSimulationCoordinator transactionCoordinator;
+    private Sender sender;
+    private final LogContext logContext = new LogContext();
+
+    private final MockTime time = new MockTime();
+    private final int requestTimeoutMs = 100;
+    private final int retryBackOffMs = 0;
+    private final long apiVersion = 0L;
+
+    private ProducerMetadata metadata = new ProducerMetadata(0, Long.MAX_VALUE, 10,
+        new LogContext(), new ClusterResourceListeners(), time);
+    private MockClient client = new MockClient(time, metadata);
+
+    @Before
+    public void setup() {
+        transactionManager = new TransactionManager(logContext, "txn-id",
+            requestTimeoutMs, apiVersion, new ApiVersions(), false);
+        transactionCoordinator = new TransactionSimulationCoordinator(client);
+    }
+
+    @Test
+    public void simulateTxnEvents() throws InterruptedException {
+        final int batchSize = 100;
+        final int lingerMs = 0;
+        final int deliveryTimeoutMs = 10;
+
+        RecordAccumulator accumulator = new RecordAccumulator(logContext, batchSize, CompressionType.GZIP,
+            lingerMs, retryBackOffMs, deliveryTimeoutMs, new Metrics(), "accumulator", time, new ApiVersions(), transactionManager,
+            new BufferPool(1000, 100, new Metrics(), time, "producer-internal-metrics"));
+
+        metadata.add("topic", time.milliseconds());
+        metadata.update(metadata.newMetadataRequestAndVersion(time.milliseconds()).requestVersion,
+            TestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 2)), true, time.milliseconds());
+
+        sender = new Sender(logContext, client, metadata, accumulator, false, 100, (short) 1,
+            Integer.MAX_VALUE, new SenderMetricsRegistry(new Metrics()), time, requestTimeoutMs, 10, transactionManager, new ApiVersions());
+
+        transactionManager.initializeTransactions();
+        sender.runOnce();
+        resolvePendingRequests();
+        final int numTransactions = 100;
+
+        TopicPartition key = new TopicPartition("topic", 0);
+        long committedOffsets = 0L;
+        Random abortTxn = new Random();
+        client.prepareMetadataUpdate(TestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 2)));
+        final long timestamp = 0L;
+        final int maxBlockTime = 0;
+
+        for (int i = 0; i < numTransactions; i++) {
+            transactionManager.beginTransaction();
+            transactionManager.maybeAddPartitionToTransaction(key);
+            accumulator.append(key, timestamp, new byte[1], new byte[1],
+                Record.EMPTY_HEADERS, null, maxBlockTime, false, time.milliseconds());
+            transactionManager.sendOffsetsToTransaction(
+                Collections.singletonMap(key, new OffsetAndMetadata(committedOffsets)),
+                new ConsumerGroupMetadata("group"));
+
+            if (abortTxn.nextBoolean()) {
+                transactionManager.beginCommit();
+                committedOffsets += 1;
+            } else {
+                transactionManager.beginAbort();
+            }
+
+            resolvePendingRequests();
+        }
+
+        assertTrue(transactionCoordinator.persistentPartitionData().containsKey(key));
+        assertTrue(transactionCoordinator.committedOffsets().containsKey(key));
+        assertEquals(committedOffsets - 1, (long) transactionCoordinator.committedOffsets().get(key));
+    }
+
+    private void resolvePendingRequests() {
+        Random dropMessageRandom = new Random();

Review comment:
       If we have a defined seed, nextRandom will be always pointing at either true or false. We may consider using a `nextInt() %2` thing




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #8725: KAFKA-9608: Transaction Event Simulation Test

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #8725:
URL: https://github.com/apache/kafka/pull/8725#discussion_r491195879



##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionSimulationCoordinator.java
##########
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.AddOffsetsToTxnResponseData;
+import org.apache.kafka.common.message.EndTxnResponseData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.message.InitProducerIdResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.AddOffsetsToTxnRequest;
+import org.apache.kafka.common.requests.AddOffsetsToTxnResponse;
+import org.apache.kafka.common.requests.AddPartitionsToTxnRequest;
+import org.apache.kafka.common.requests.AddPartitionsToTxnResponse;
+import org.apache.kafka.common.requests.EndTxnRequest;
+import org.apache.kafka.common.requests.EndTxnResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.requests.InitProducerIdRequest;
+import org.apache.kafka.common.requests.InitProducerIdResponse;
+import org.apache.kafka.common.requests.ProduceRequest;
+import org.apache.kafka.common.requests.ProduceResponse;
+import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse;
+import org.apache.kafka.common.requests.TransactionResult;
+import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
+import org.apache.kafka.common.requests.TxnOffsetCommitResponse;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Random;
+
+import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_EPOCH;
+import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_ID;
+
+/**
+ * A reduced functionality of a combination of transaction coordinator and group coordinator.
+ * It provides basic event handling from {@link Sender} with transaction turned on.
+ *
+ * Random fault injection is supported as well, which will return a retriable error to
+ * the client.
+ */
+class TransactionSimulationCoordinator {
+
+    private final Map<TopicPartition, List<Record>> pendingPartitionData;
+    private final Map<TopicPartition, Long> pendingOffsets;
+    private boolean offsetsAddedToTxn = false;
+
+    private long currentProducerId = 0L;
+    private short currentEpoch = 0;
+    private Random faultInjectRandom = new Random();
+
+    public Map<TopicPartition, List<Record>> persistentPartitionData() {
+        return persistentPartitionData;
+    }
+
+    public Map<TopicPartition, Long> committedOffsets() {
+        return committedOffsets;
+    }
+
+    private final Map<TopicPartition, List<Record>> persistentPartitionData;
+    private final Map<TopicPartition, Long> committedOffsets;
+
+    private final MockClient networkClient;
+    private final int throttleTimeMs = 10;
+
+    TransactionSimulationCoordinator(MockClient networkClient) {
+        this.networkClient = networkClient;
+        this.pendingPartitionData = new HashMap<>();
+        this.pendingOffsets = new HashMap<>();
+        this.persistentPartitionData = new HashMap<>();
+        this.committedOffsets = new HashMap<>();
+    }
+
+    void runOnce(boolean dropMessage) {
+        Queue<ClientRequest> incomingRequests = networkClient.requests();
+
+        final boolean faultInject = faultInjectRandom.nextBoolean();
+
+        if (incomingRequests.peek() == null) {
+            return;
+        }
+
+        final AbstractResponse response;
+        AbstractRequest nextRequest = incomingRequests.peek().requestBuilder().build();
+        if (nextRequest instanceof FindCoordinatorRequest) {
+            response = handleFindCoordinator(faultInject);
+        } else if (nextRequest instanceof InitProducerIdRequest) {
+            response = handleInitProducerId((InitProducerIdRequest) nextRequest, faultInject);
+        } else if (nextRequest instanceof AddPartitionsToTxnRequest) {
+            response = handleAddPartitionToTxn((AddPartitionsToTxnRequest) nextRequest, faultInject);
+        } else if (nextRequest instanceof AddOffsetsToTxnRequest) {
+            response = handleAddOffsetsToTxn((AddOffsetsToTxnRequest) nextRequest, faultInject);
+        } else if (nextRequest instanceof TxnOffsetCommitRequest) {
+            response = handleTxnCommit((TxnOffsetCommitRequest) nextRequest, faultInject);
+        } else if (nextRequest instanceof ProduceRequest) {
+            response = handleProduce((ProduceRequest) nextRequest, faultInject);
+        } else if (nextRequest instanceof EndTxnRequest) {
+            response = handleEndTxn((EndTxnRequest) nextRequest, faultInject);
+        } else {
+            throw new IllegalArgumentException("Unknown request: " + nextRequest);
+        }
+
+        networkClient.respond(response, dropMessage);
+    }
+
+    private FindCoordinatorResponse handleFindCoordinator(final boolean faultInject) {
+        if (faultInject) {
+            return new FindCoordinatorResponse(
+                new FindCoordinatorResponseData()
+                    .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()));
+        } else {
+            return new FindCoordinatorResponse(
+                new FindCoordinatorResponseData()
+                    .setErrorCode(Errors.NONE.code())
+                    .setHost("localhost")
+                    .setNodeId(0)
+                    .setPort(2211)
+            );
+        }
+    }
+
+    private InitProducerIdResponse handleInitProducerId(InitProducerIdRequest request,
+                                                        final boolean faultInject) {
+        if (faultInject) {
+            return new InitProducerIdResponse(
+                new InitProducerIdResponseData()
+                    .setErrorCode(Errors.NOT_COORDINATOR.code())
+            );
+        } else if (request.data.producerId() != NO_PRODUCER_ID &&
+                    request.data.producerId() != currentProducerId) {
+            return new InitProducerIdResponse(
+                new InitProducerIdResponseData()
+                    .setErrorCode(Errors.UNKNOWN_PRODUCER_ID.code())
+                    .setThrottleTimeMs(throttleTimeMs)
+            );
+        } else if (request.data.producerEpoch() != NO_PRODUCER_EPOCH &&
+                    request.data.producerEpoch() != currentEpoch) {
+            return new InitProducerIdResponse(
+                new InitProducerIdResponseData()
+                    .setErrorCode(Errors.INVALID_PRODUCER_EPOCH.code())
+                    .setThrottleTimeMs(throttleTimeMs)
+            );
+        } else {
+            currentProducerId += 1;
+            currentEpoch += 1;
+            return new InitProducerIdResponse(
+                new InitProducerIdResponseData()
+                    .setProducerId(currentProducerId)
+                    .setProducerEpoch(currentEpoch)
+                    .setErrorCode(Errors.NONE.code())
+            );
+        }
+    }
+
+    private AddPartitionsToTxnResponse handleAddPartitionToTxn(AddPartitionsToTxnRequest request,
+                                                               final boolean faultInject) {
+        Map<TopicPartition, Errors> errors = new HashMap<>();
+        request.partitions().forEach(topicPartition -> {
+            if (faultInject) {
+                errors.put(topicPartition, Errors.COORDINATOR_NOT_AVAILABLE);
+            } else if (request.data.producerId() != currentProducerId) {
+                errors.put(topicPartition, Errors.UNKNOWN_PRODUCER_ID);
+            } else if (request.data.producerEpoch() != currentEpoch) {
+                errors.put(topicPartition, Errors.INVALID_PRODUCER_EPOCH);
+            } else {
+                errors.put(topicPartition, Errors.NONE);
+            }
+        });
+
+        return new AddPartitionsToTxnResponse(
+            throttleTimeMs,
+            errors
+        );
+    }
+
+    private AddOffsetsToTxnResponse handleAddOffsetsToTxn(AddOffsetsToTxnRequest request,
+                                                          final boolean faultInject) {
+        if (faultInject) {
+            return new AddOffsetsToTxnResponse(
+                new AddOffsetsToTxnResponseData()
+                    .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code())
+                    .setThrottleTimeMs(throttleTimeMs)
+            );
+        } else if (request.data.producerId() != currentProducerId) {
+            return new AddOffsetsToTxnResponse(
+                new AddOffsetsToTxnResponseData()
+                    .setErrorCode(Errors.UNKNOWN_PRODUCER_ID.code())
+                    .setThrottleTimeMs(throttleTimeMs)
+            );
+        } else if (request.data.producerEpoch() != currentEpoch) {
+            return new AddOffsetsToTxnResponse(
+                new AddOffsetsToTxnResponseData()
+                    .setErrorCode(Errors.INVALID_PRODUCER_EPOCH.code())
+                    .setThrottleTimeMs(throttleTimeMs)
+            );
+        } else {
+            offsetsAddedToTxn = true;
+            return new AddOffsetsToTxnResponse(
+                new AddOffsetsToTxnResponseData()
+                    .setErrorCode(Errors.NONE.code())
+                    .setThrottleTimeMs(throttleTimeMs)
+            );
+        }
+    }
+
+    private AbstractResponse handleTxnCommit(TxnOffsetCommitRequest request,
+                                             final boolean faultInject) {
+        Map<TopicPartition, Errors> errors = new HashMap<>();
+        request.data.topics().forEach(topic -> topic.partitions().forEach(partition -> {
+            TopicPartition key = new TopicPartition(topic.name(), partition.partitionIndex());
+            if (faultInject) {
+                errors.put(key, Errors.COORDINATOR_LOAD_IN_PROGRESS);
+            } else if (request.data.producerId() != currentProducerId) {
+                errors.put(key, Errors.UNKNOWN_PRODUCER_ID);
+            } else if (request.data.producerEpoch() != currentEpoch) {
+                errors.put(key, Errors.INVALID_PRODUCER_EPOCH);
+            } else if (offsetsAddedToTxn) {
+                pendingOffsets.put(key, partition.committedOffset());
+                errors.put(key, Errors.NONE);
+            } else {
+                errors.put(key, Errors.UNKNOWN_TOPIC_OR_PARTITION);
+            }
+        }));
+
+        return new TxnOffsetCommitResponse(
+            throttleTimeMs,
+            errors
+        );
+    }
+
+    private AbstractResponse handleProduce(ProduceRequest request,
+                                           final boolean faultInject) {
+        Map<TopicPartition, PartitionResponse> errors = new HashMap<>();
+        Map<TopicPartition, MemoryRecords> partitionRecords = request.partitionRecordsOrFail();
+
+        partitionRecords.forEach((topicPartition, records) -> {
+            if (faultInject) {
+                // Trigger KIP-360 path.
+                errors.put(topicPartition, new PartitionResponse(Errors.UNKNOWN_PRODUCER_ID));
+            } else {
+                List<Record> sentRecords = pendingPartitionData.getOrDefault(topicPartition, new ArrayList<>());
+                for (Record partitionRecord  : records.records()) {
+                    sentRecords.add(partitionRecord);
+                }
+
+                pendingPartitionData.put(topicPartition, sentRecords);
+                errors.put(topicPartition, new PartitionResponse(Errors.NONE));
+            }
+        });
+
+        return new ProduceResponse(errors, throttleTimeMs);
+    }
+
+    private EndTxnResponse handleEndTxn(EndTxnRequest request, final boolean faultInject) {

Review comment:
       Thought we do have that coverage on L126: 
   ```
           networkClient.respond(response, disconnect);
   ```
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #8725: KAFKA-9608: Transaction Event Simulation Test

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #8725:
URL: https://github.com/apache/kafka/pull/8725#discussion_r490713327



##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionSimulationCoordinator.java
##########
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.AddOffsetsToTxnResponseData;
+import org.apache.kafka.common.message.EndTxnResponseData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.message.InitProducerIdResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.AddOffsetsToTxnRequest;
+import org.apache.kafka.common.requests.AddOffsetsToTxnResponse;
+import org.apache.kafka.common.requests.AddPartitionsToTxnRequest;
+import org.apache.kafka.common.requests.AddPartitionsToTxnResponse;
+import org.apache.kafka.common.requests.EndTxnRequest;
+import org.apache.kafka.common.requests.EndTxnResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.requests.InitProducerIdRequest;
+import org.apache.kafka.common.requests.InitProducerIdResponse;
+import org.apache.kafka.common.requests.ProduceRequest;
+import org.apache.kafka.common.requests.ProduceResponse;
+import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse;
+import org.apache.kafka.common.requests.TransactionResult;
+import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
+import org.apache.kafka.common.requests.TxnOffsetCommitResponse;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Random;
+
+import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_EPOCH;
+import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_ID;
+
+/**
+ * A reduced functionality of a combination of transaction coordinator and group coordinator.
+ * It provides basic event handling from {@link Sender} with transaction turned on.
+ *
+ * Random fault injection is supported as well, which will return a retriable error to
+ * the client.
+ */
+class TransactionSimulationCoordinator {
+
+    private final Map<TopicPartition, List<Record>> pendingPartitionData;
+    private final Map<TopicPartition, Long> pendingOffsets;
+    private boolean offsetsAddedToTxn = false;
+
+    private long currentProducerId = 0L;
+    private short currentEpoch = 0;
+    private Random faultInjectRandom = new Random();
+
+    public Map<TopicPartition, List<Record>> persistentPartitionData() {
+        return persistentPartitionData;
+    }
+
+    public Map<TopicPartition, Long> committedOffsets() {
+        return committedOffsets;
+    }
+
+    private final Map<TopicPartition, List<Record>> persistentPartitionData;
+    private final Map<TopicPartition, Long> committedOffsets;
+
+    private final MockClient networkClient;
+    private final int throttleTimeMs = 10;
+
+    TransactionSimulationCoordinator(MockClient networkClient) {
+        this.networkClient = networkClient;
+        this.pendingPartitionData = new HashMap<>();
+        this.pendingOffsets = new HashMap<>();
+        this.persistentPartitionData = new HashMap<>();
+        this.committedOffsets = new HashMap<>();
+    }
+
+    void runOnce(boolean dropMessage) {

Review comment:
       so you are suggesting `maybeDisconnect`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org