You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/05/09 17:15:43 UTC
kafka git commit: KAFKA-5126: Implement KIP-98 transactional methods
in the MockProducer
Repository: kafka
Updated Branches:
refs/heads/trunk 5b36adde4 -> c69842336
KAFKA-5126: Implement KIP-98 transactional methods in the MockProducer
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Damian Guy, Guozhang Wang
Closes #2951 from mjsax/kafka-5126-add-transactions-to-mock-producer
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c6984233
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c6984233
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c6984233
Branch: refs/heads/trunk
Commit: c69842336d87cc321a58171c517c46cdddfe1a64
Parents: 5b36add
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Tue May 9 10:15:40 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue May 9 10:15:40 2017 -0700
----------------------------------------------------------------------
.../kafka/clients/producer/MockProducer.java | 164 ++++++-
.../clients/producer/MockProducerTest.java | 468 ++++++++++++++++++-
2 files changed, 592 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/c6984233/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index 1b4151c..15ea454 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -51,12 +51,20 @@ public class MockProducer<K, V> implements Producer<K, V> {
private final Cluster cluster;
private final Partitioner partitioner;
private final List<ProducerRecord<K, V>> sent;
+ private final List<ProducerRecord<K, V>> uncommittedSends;
private final Deque<Completion> completions;
- private boolean autoComplete;
- private Map<TopicPartition, Long> offsets;
- private boolean closed;
+ private final Map<TopicPartition, Long> offsets;
+ private final List<Map<String, Map<TopicPartition, OffsetAndMetadata>>> consumerGroupOffsets;
+ private Map<String, Map<TopicPartition, OffsetAndMetadata>> uncommittedConsumerGroupOffsets;
private final ExtendedSerializer<K> keySerializer;
private final ExtendedSerializer<V> valueSerializer;
+ private boolean autoComplete;
+ private boolean closed;
+ private boolean transactionInitialized;
+ private boolean transactionInFlight;
+ private boolean transactionCommitted;
+ private boolean transactionAborted;
+ private boolean producerFenced;
/**
* Create a mock producer
@@ -80,9 +88,12 @@ public class MockProducer<K, V> implements Producer<K, V> {
this.partitioner = partitioner;
this.keySerializer = ensureExtended(keySerializer);
this.valueSerializer = ensureExtended(valueSerializer);
- this.offsets = new HashMap<TopicPartition, Long>();
- this.sent = new ArrayList<ProducerRecord<K, V>>();
- this.completions = new ArrayDeque<Completion>();
+ this.offsets = new HashMap<>();
+ this.sent = new ArrayList<>();
+ this.uncommittedSends = new ArrayList<>();
+ this.consumerGroupOffsets = new ArrayList<>();
+ this.uncommittedConsumerGroupOffsets = new HashMap<>();
+ this.completions = new ArrayDeque<>();
}
/**
@@ -117,29 +128,94 @@ public class MockProducer<K, V> implements Producer<K, V> {
this(Cluster.empty(), false, null, null, null);
}
- public void initTransactions() {
+ private <T> ExtendedSerializer<T> ensureExtended(Serializer<T> serializer) {
+ return serializer instanceof ExtendedSerializer ? (ExtendedSerializer<T>) serializer : new ExtendedSerializer.Wrapper<>(serializer);
+ }
+ @Override
+ public void initTransactions() {
+ verifyProducerState();
+ if (this.transactionInitialized) {
+ throw new IllegalStateException("MockProducer has already been initialized for transactions.");
+ }
+ this.transactionInitialized = true;
}
+ @Override
public void beginTransaction() throws ProducerFencedException {
-
+ verifyProducerState();
+ verifyTransactionsInitialized();
+ this.transactionInFlight = true;
+ this.transactionCommitted = false;
+ this.transactionAborted = false;
}
+ @Override
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
- String consumerGroupId) throws ProducerFencedException {
-
+ String consumerGroupId) throws ProducerFencedException {
+ verifyProducerState();
+ verifyTransactionsInitialized();
+ verifyNoTransactionInFlight();
+ Map<TopicPartition, OffsetAndMetadata> uncommittedOffsets = this.uncommittedConsumerGroupOffsets.get(consumerGroupId);
+ if (uncommittedOffsets == null) {
+ uncommittedOffsets = new HashMap<>();
+ this.uncommittedConsumerGroupOffsets.put(consumerGroupId, uncommittedOffsets);
+ }
+ uncommittedOffsets.putAll(offsets);
}
+ @Override
public void commitTransaction() throws ProducerFencedException {
+ verifyProducerState();
+ verifyTransactionsInitialized();
+ verifyNoTransactionInFlight();
+
+ flush();
+
+ this.sent.addAll(this.uncommittedSends);
+ if (!this.uncommittedConsumerGroupOffsets.isEmpty())
+ this.consumerGroupOffsets.add(this.uncommittedConsumerGroupOffsets);
+
+ this.uncommittedSends.clear();
+ this.uncommittedConsumerGroupOffsets = new HashMap<>();
+ this.transactionCommitted = true;
+ this.transactionAborted = false;
+ this.transactionInFlight = false;
}
+ @Override
public void abortTransaction() throws ProducerFencedException {
-
+ verifyProducerState();
+ verifyTransactionsInitialized();
+ verifyNoTransactionInFlight();
+ flush();
+ this.uncommittedSends.clear();
+ this.uncommittedConsumerGroupOffsets.clear();
+ this.transactionCommitted = false;
+ this.transactionAborted = true;
+ this.transactionInFlight = false;
}
-
- private <T> ExtendedSerializer<T> ensureExtended(Serializer<T> serializer) {
- return serializer instanceof ExtendedSerializer ? (ExtendedSerializer<T>) serializer : new ExtendedSerializer.Wrapper<>(serializer);
+
+ private void verifyProducerState() {
+ if (this.closed) {
+ throw new IllegalStateException("MockProducer is already closed.");
+ }
+ if (this.producerFenced) {
+ throw new ProducerFencedException("MockProducer is fenced.");
+ }
+ }
+
+ private void verifyTransactionsInitialized() {
+ if (!this.transactionInitialized) {
+ throw new IllegalStateException("MockProducer hasn't been initialized for transactions.");
+ }
+ }
+
+ private void verifyNoTransactionInFlight() {
+ if (!this.transactionInFlight) {
+ throw new IllegalStateException("There is no open transaction.");
+ }
}
/**
@@ -159,6 +235,7 @@ public class MockProducer<K, V> implements Producer<K, V> {
*/
@Override
public synchronized Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
+ verifyProducerState();
int partition = 0;
if (!this.cluster.partitionsForTopic(record.topic()).isEmpty())
partition = partition(record, this.cluster);
@@ -169,11 +246,17 @@ public class MockProducer<K, V> implements Producer<K, V> {
Completion completion = new Completion(offset,
new RecordMetadata(topicPartition, 0, offset, RecordBatch.NO_TIMESTAMP, 0, 0, 0),
result, callback);
- this.sent.add(record);
+
+ if (!this.transactionInFlight)
+ this.sent.add(record);
+ else
+ this.uncommittedSends.add(record);
+
if (autoComplete)
completion.complete(null);
else
this.completions.addLast(completion);
+
return future;
}
@@ -212,29 +295,68 @@ public class MockProducer<K, V> implements Producer<K, V> {
@Override
public void close(long timeout, TimeUnit timeUnit) {
- if (closed) {
- throw new IllegalStateException("MockedProducer is already closed.");
+ if (this.closed) {
+ throw new IllegalStateException("MockProducer is already closed.");
}
- closed = true;
+ if (transactionInFlight)
+ abortTransaction();
+ this.closed = true;
}
public boolean closed() {
- return closed;
+ return this.closed;
+ }
+
+ public void fenceProducer() {
+ verifyProducerState();
+ verifyTransactionsInitialized();
+ this.producerFenced = true;
+ }
+
+ public boolean transactionInitialized() {
+ return this.transactionInitialized;
+ }
+
+ public boolean transactionInFlight() {
+ return this.transactionInFlight;
+ }
+
+ public boolean transactionCommitted() {
+ return this.transactionCommitted;
+ }
+
+ public boolean transactionAborted() {
+ return this.transactionAborted;
}
/**
* Get the list of sent records since the last call to {@link #clear()}
*/
public synchronized List<ProducerRecord<K, V>> history() {
- return new ArrayList<ProducerRecord<K, V>>(this.sent);
+ return new ArrayList<>(this.sent);
}
/**
- * Clear the stored history of sent records
+ * Get the list of committed consumer group offsets since the last call to {@link #clear()}
+ */
+ public synchronized List<Map<String, Map<TopicPartition, OffsetAndMetadata>>> consumerGroupOffsetsHistory() {
+ return new ArrayList<>(this.consumerGroupOffsets);
+ }
+ /**
+ *
+ * Clear the stored history of sent records, consumer group offsets, and transactional state
*/
public synchronized void clear() {
this.sent.clear();
+ this.uncommittedSends.clear();
this.completions.clear();
+ this.consumerGroupOffsets.clear();
+ this.uncommittedConsumerGroupOffsets.clear();
+ this.transactionInitialized = false;
+ this.transactionInFlight = false;
+ this.transactionCommitted = false;
+ this.transactionAborted = false;
+ this.producerFenced = false;
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/c6984233/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
index a29b881..468ea49 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
@@ -16,41 +16,51 @@
*/
package org.apache.kafka.clients.producer;
-import static java.util.Arrays.asList;
-import static java.util.Collections.singletonList;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.test.MockSerializer;
import org.junit.Test;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
public class MockProducerTest {
- private String topic = "topic";
+ private final String topic = "topic";
+ private final MockProducer<byte[], byte[]> producer = new MockProducer<>(true, new MockSerializer(), new MockSerializer());
+ private final ProducerRecord<byte[], byte[]> record1 = new ProducerRecord<>(topic, "key1".getBytes(), "value1".getBytes());
+ private final ProducerRecord<byte[], byte[]> record2 = new ProducerRecord<>(topic, "key2".getBytes(), "value2".getBytes());
+
@Test
@SuppressWarnings("unchecked")
public void testAutoCompleteMock() throws Exception {
- MockProducer<byte[], byte[]> producer = new MockProducer<>(true, new MockSerializer(), new MockSerializer());
- ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, "key".getBytes(), "value".getBytes());
- Future<RecordMetadata> metadata = producer.send(record);
+ Future<RecordMetadata> metadata = producer.send(record1);
assertTrue("Send should be immediately complete", metadata.isDone());
assertFalse("Send should be successful", isError(metadata));
assertEquals("Offset should be 0", 0L, metadata.get().offset());
assertEquals(topic, metadata.get().topic());
- assertEquals("We should have the record in our history", singletonList(record), producer.history());
+ assertEquals("We should have the record in our history", singletonList(record1), producer.history());
producer.clear();
assertEquals("Clear should erase our history", 0, producer.history().size());
}
@@ -72,8 +82,6 @@ public class MockProducerTest {
@Test
public void testManualCompletion() throws Exception {
MockProducer<byte[], byte[]> producer = new MockProducer<>(false, new MockSerializer(), new MockSerializer());
- ProducerRecord<byte[], byte[]> record1 = new ProducerRecord<>(topic, "key1".getBytes(), "value1".getBytes());
- ProducerRecord<byte[], byte[]> record2 = new ProducerRecord<>(topic, "key2".getBytes(), "value2".getBytes());
Future<RecordMetadata> md1 = producer.send(record1);
assertFalse("Send shouldn't have completed", md1.isDone());
Future<RecordMetadata> md2 = producer.send(record2);
@@ -98,6 +106,428 @@ public class MockProducerTest {
assertTrue("Requests should be completed.", md3.isDone() && md4.isDone());
}
+ @Test
+ public void shouldInitTransactions() {
+ producer.initTransactions();
+ assertTrue(producer.transactionInitialized());
+ }
+
+ @Test
+ public void shouldThrowOnInitTransactionIfProducerAlreadyInitializedForTransactions() {
+ producer.initTransactions();
+ try {
+ producer.initTransactions();
+ fail("Should have thrown as producer is already initialized");
+ } catch (IllegalStateException e) { }
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void shouldThrowOnBeginTransactionIfTransactionsNotInitialized() {
+ producer.beginTransaction();
+ }
+
+ @Test
+ public void shouldBeginTransactions() {
+ producer.initTransactions();
+ producer.beginTransaction();
+ assertTrue(producer.transactionInFlight());
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void shouldThrowOnSendOffsetsToTransactionIfTransactionsNotInitialized() {
+ producer.sendOffsetsToTransaction(null, null);
+ }
+
+ @Test
+ public void shouldThrowOnSendOffsetsToTransactionTransactionIfNoTransactionGotStarted() {
+ producer.initTransactions();
+ try {
+ producer.sendOffsetsToTransaction(null, null);
+ fail("Should have thrown as producer has no open transaction");
+ } catch (IllegalStateException e) { }
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void shouldThrowOnCommitIfTransactionsNotInitialized() {
+ producer.commitTransaction();
+ }
+
+ @Test
+ public void shouldThrowOnCommitTransactionIfNoTransactionGotStarted() {
+ producer.initTransactions();
+ try {
+ producer.commitTransaction();
+ fail("Should have thrown as producer has no open transaction");
+ } catch (IllegalStateException e) { }
+ }
+
+ @Test
+ public void shouldCommitEmptyTransaction() {
+ producer.initTransactions();
+ producer.beginTransaction();
+ producer.commitTransaction();
+ assertFalse(producer.transactionInFlight());
+ assertTrue(producer.transactionCommitted());
+ assertFalse(producer.transactionAborted());
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void shouldThrowOnAbortIfTransactionsNotInitialized() {
+ producer.abortTransaction();
+ }
+
+ @Test
+ public void shouldThrowOnAbortTransactionIfNoTransactionGotStarted() {
+ producer.initTransactions();
+ try {
+ producer.abortTransaction();
+ fail("Should have thrown as producer has no open transaction");
+ } catch (IllegalStateException e) { }
+ }
+
+ @Test
+ public void shouldAbortEmptyTransaction() {
+ producer.initTransactions();
+ producer.beginTransaction();
+ producer.abortTransaction();
+ assertFalse(producer.transactionInFlight());
+ assertTrue(producer.transactionAborted());
+ assertFalse(producer.transactionCommitted());
+ }
+
+ @Test
+ public void shouldAbortInFlightTransactionOnClose() {
+ producer.initTransactions();
+ producer.beginTransaction();
+ producer.close();
+ assertFalse(producer.transactionInFlight());
+ assertTrue(producer.transactionAborted());
+ assertFalse(producer.transactionCommitted());
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void shouldThrowFenceProducerIfTransactionsNotInitialized() {
+ producer.fenceProducer();
+ }
+
+ @Test
+ public void shouldThrowOnBeginTransactionsIfProducerGotFenced() {
+ producer.initTransactions();
+ producer.fenceProducer();
+ try {
+ producer.beginTransaction();
+ fail("Should have thrown as producer is fenced off");
+ } catch (ProducerFencedException e) { }
+ }
+
+ @Test
+ public void shouldThrowOnSendIfProducerGotFenced() {
+ producer.initTransactions();
+ producer.fenceProducer();
+ try {
+ producer.send(null);
+ fail("Should have thrown as producer is fenced off");
+ } catch (ProducerFencedException e) { }
+ }
+
+ @Test
+ public void shouldThrowOnSendOffsetsToTransactionIfProducerGotFenced() {
+ producer.initTransactions();
+ producer.fenceProducer();
+ try {
+ producer.sendOffsetsToTransaction(null, null);
+ fail("Should have thrown as producer is fenced off");
+ } catch (ProducerFencedException e) { }
+ }
+
+ @Test
+ public void shouldThrowOnCommitTransactionIfProducerGotFenced() {
+ producer.initTransactions();
+ producer.fenceProducer();
+ try {
+ producer.commitTransaction();
+ fail("Should have thrown as producer is fenced off");
+ } catch (ProducerFencedException e) { }
+ }
+
+ @Test
+ public void shouldThrowOnAbortTransactionIfProducerGotFenced() {
+ producer.initTransactions();
+ producer.fenceProducer();
+ try {
+ producer.abortTransaction();
+ fail("Should have thrown as producer is fenced off");
+ } catch (ProducerFencedException e) { }
+ }
+
+ @Test
+ public void shouldPublishMessagesOnlyAfterCommitIfTransactionsAreEnabled() {
+ producer.initTransactions();
+ producer.beginTransaction();
+
+ producer.send(record1);
+ producer.send(record2);
+
+ assertTrue(producer.history().isEmpty());
+
+ producer.commitTransaction();
+
+ List<ProducerRecord<byte[], byte[]>> expectedResult = new ArrayList<>();
+ expectedResult.add(record1);
+ expectedResult.add(record2);
+
+ assertThat(producer.history(), equalTo(expectedResult));
+ }
+
+ @Test
+ public void shouldFlushOnCommitForNonAutoCompleteIfTransactionsAreEnabled() {
+ MockProducer<byte[], byte[]> producer = new MockProducer<>(false, new MockSerializer(), new MockSerializer());
+ producer.initTransactions();
+ producer.beginTransaction();
+
+ Future<RecordMetadata> md1 = producer.send(record1);
+ Future<RecordMetadata> md2 = producer.send(record2);
+
+ assertFalse(md1.isDone());
+ assertFalse(md2.isDone());
+
+ producer.commitTransaction();
+
+ assertTrue(md1.isDone());
+ assertTrue(md2.isDone());
+ }
+
+ @Test
+ public void shouldDropMessagesOnAbortIfTransactionsAreEnabled() {
+ producer.initTransactions();
+
+ producer.beginTransaction();
+ producer.send(record1);
+ producer.send(record2);
+ producer.abortTransaction();
+ assertTrue(producer.history().isEmpty());
+
+ producer.beginTransaction();
+ producer.commitTransaction();
+ assertTrue(producer.history().isEmpty());
+ }
+
+ @Test
+ public void shouldThrowOnAbortForNonAutoCompleteIfTransactionsAreEnabled() throws Exception {
+ MockProducer<byte[], byte[]> producer = new MockProducer<>(false, new MockSerializer(), new MockSerializer());
+ producer.initTransactions();
+ producer.beginTransaction();
+
+ Future<RecordMetadata> md1 = producer.send(record1);
+ assertFalse(md1.isDone());
+
+ producer.abortTransaction();
+ assertTrue(md1.isDone());
+ }
+
+ @Test
+ public void shouldPreserveCommittedMessagesOnAbortIfTransactionsAreEnabled() {
+ producer.initTransactions();
+
+ producer.beginTransaction();
+ producer.send(record1);
+ producer.send(record2);
+ producer.commitTransaction();
+
+ producer.beginTransaction();
+ producer.abortTransaction();
+
+ List<ProducerRecord<byte[], byte[]>> expectedResult = new ArrayList<>();
+ expectedResult.add(record1);
+ expectedResult.add(record2);
+
+ assertThat(producer.history(), equalTo(expectedResult));
+ }
+
+ @Test
+ public void shouldPublishConsumerGroupOffsetsOnlyAfterCommitIfTransactionsAreEnabled() {
+ producer.initTransactions();
+ producer.beginTransaction();
+
+ String group1 = "g1";
+ Map<TopicPartition, OffsetAndMetadata> group1Commit = new HashMap<TopicPartition, OffsetAndMetadata>() {
+ {
+ put(new TopicPartition(topic, 0), new OffsetAndMetadata(42L, null));
+ put(new TopicPartition(topic, 1), new OffsetAndMetadata(73L, null));
+ }
+ };
+ String group2 = "g2";
+ Map<TopicPartition, OffsetAndMetadata> group2Commit = new HashMap<TopicPartition, OffsetAndMetadata>() {
+ {
+ put(new TopicPartition(topic, 0), new OffsetAndMetadata(101L, null));
+ put(new TopicPartition(topic, 1), new OffsetAndMetadata(21L, null));
+ }
+ };
+ producer.sendOffsetsToTransaction(group1Commit, group1);
+ producer.sendOffsetsToTransaction(group2Commit, group2);
+
+ assertTrue(producer.consumerGroupOffsetsHistory().isEmpty());
+
+ Map<String, Map<TopicPartition, OffsetAndMetadata>> expectedResult = new HashMap<>();
+ expectedResult.put(group1, group1Commit);
+ expectedResult.put(group2, group2Commit);
+
+ producer.commitTransaction();
+ assertThat(producer.consumerGroupOffsetsHistory(), equalTo(Collections.singletonList(expectedResult)));
+ }
+
+ @Test
+ public void shouldPublishLatestAndCumulativeConsumerGroupOffsetsOnlyAfterCommitIfTransactionsAreEnabled() {
+ producer.initTransactions();
+ producer.beginTransaction();
+
+ String group = "g";
+ Map<TopicPartition, OffsetAndMetadata> groupCommit1 = new HashMap<TopicPartition, OffsetAndMetadata>() {
+ {
+ put(new TopicPartition(topic, 0), new OffsetAndMetadata(42L, null));
+ put(new TopicPartition(topic, 1), new OffsetAndMetadata(73L, null));
+ }
+ };
+ Map<TopicPartition, OffsetAndMetadata> groupCommit2 = new HashMap<TopicPartition, OffsetAndMetadata>() {
+ {
+ put(new TopicPartition(topic, 1), new OffsetAndMetadata(101L, null));
+ put(new TopicPartition(topic, 2), new OffsetAndMetadata(21L, null));
+ }
+ };
+ producer.sendOffsetsToTransaction(groupCommit1, group);
+ producer.sendOffsetsToTransaction(groupCommit2, group);
+
+ assertTrue(producer.consumerGroupOffsetsHistory().isEmpty());
+
+ Map<String, Map<TopicPartition, OffsetAndMetadata>> expectedResult = new HashMap<>();
+ expectedResult.put(group, new HashMap<TopicPartition, OffsetAndMetadata>() {
+ {
+ put(new TopicPartition(topic, 0), new OffsetAndMetadata(42L, null));
+ put(new TopicPartition(topic, 1), new OffsetAndMetadata(101L, null));
+ put(new TopicPartition(topic, 2), new OffsetAndMetadata(21L, null));
+ }
+ });
+
+ producer.commitTransaction();
+ assertThat(producer.consumerGroupOffsetsHistory(), equalTo(Collections.singletonList(expectedResult)));
+ }
+
+ @Test
+ public void shouldDropConsumerGroupOffsetsOnAbortIfTransactionsAreEnabled() {
+ producer.initTransactions();
+ producer.beginTransaction();
+
+ String group = "g";
+ Map<TopicPartition, OffsetAndMetadata> groupCommit = new HashMap<TopicPartition, OffsetAndMetadata>() {
+ {
+ put(new TopicPartition(topic, 0), new OffsetAndMetadata(42L, null));
+ put(new TopicPartition(topic, 1), new OffsetAndMetadata(73L, null));
+ }
+ };
+ producer.sendOffsetsToTransaction(groupCommit, group);
+ producer.abortTransaction();
+
+ producer.beginTransaction();
+ producer.commitTransaction();
+ assertTrue(producer.consumerGroupOffsetsHistory().isEmpty());
+ }
+
+ @Test
+ public void shouldPreserveCommittedConsumerGroupsOffsetsOnAbortIfTransactionsAreEnabled() {
+ producer.initTransactions();
+ producer.beginTransaction();
+
+ String group = "g";
+ Map<TopicPartition, OffsetAndMetadata> groupCommit = new HashMap<TopicPartition, OffsetAndMetadata>() {
+ {
+ put(new TopicPartition(topic, 0), new OffsetAndMetadata(42L, null));
+ put(new TopicPartition(topic, 1), new OffsetAndMetadata(73L, null));
+ }
+ };
+ producer.sendOffsetsToTransaction(groupCommit, group);
+ producer.commitTransaction();
+
+ producer.beginTransaction();
+ producer.abortTransaction();
+
+ Map<String, Map<TopicPartition, OffsetAndMetadata>> expectedResult = new HashMap<>();
+ expectedResult.put(group, groupCommit);
+
+ assertThat(producer.consumerGroupOffsetsHistory(), equalTo(Collections.singletonList(expectedResult)));
+ }
+
+ @Test
+ public void shouldThrowOnInitTransactionIfProducerIsClosed() {
+ producer.close();
+ try {
+ producer.initTransactions();
+ fail("Should have thrown as producer is already closed");
+ } catch (IllegalStateException e) { }
+ }
+
+ @Test
+ public void shouldThrowOnSendIfProducerIsClosed() {
+ producer.close();
+ try {
+ producer.send(null);
+ fail("Should have thrown as producer is already closed");
+ } catch (IllegalStateException e) { }
+ }
+
+ @Test
+ public void shouldThrowOnBeginTransactionIfProducerIsClosed() {
+ producer.close();
+ try {
+ producer.beginTransaction();
+ fail("Should have thrown as producer is already closed");
+ } catch (IllegalStateException e) { }
+ }
+
+ @Test
+ public void shouldThrowSendOffsetsToTransactionIfProducerIsClosed() {
+ producer.close();
+ try {
+ producer.sendOffsetsToTransaction(null, null);
+ fail("Should have thrown as producer is already closed");
+ } catch (IllegalStateException e) { }
+ }
+
+ @Test
+ public void shouldThrowOnCommitTransactionIfProducerIsClosed() {
+ producer.close();
+ try {
+ producer.commitTransaction();
+ fail("Should have thrown as producer is already closed");
+ } catch (IllegalStateException e) { }
+ }
+
+ @Test
+ public void shouldThrowOnAbortTransactionIfProducerIsClosed() {
+ producer.close();
+ try {
+ producer.abortTransaction();
+ fail("Should have thrown as producer is already closed");
+ } catch (IllegalStateException e) { }
+ }
+
+ @Test
+ public void shouldThrowOnCloseIfProducerIsClosed() {
+ producer.close();
+ try {
+ producer.close();
+ fail("Should have thrown as producer is already closed");
+ } catch (IllegalStateException e) { }
+ }
+
+ @Test
+ public void shouldThrowOnFenceProducerIfProducerIsClosed() {
+ producer.close();
+ try {
+ producer.fenceProducer();
+ fail("Should have thrown as producer is already closed");
+ } catch (IllegalStateException e) { }
+ }
+
private boolean isError(Future<?> future) {
try {
future.get();