You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/05/09 17:15:43 UTC

kafka git commit: KAFKA-5126: Implement KIP-98 transactional methods in the MockProducer

Repository: kafka
Updated Branches:
  refs/heads/trunk 5b36adde4 -> c69842336


KAFKA-5126: Implement KIP-98 transactional methods in the MockProducer

Author: Matthias J. Sax <ma...@confluent.io>

Reviewers: Damian Guy, Guozhang Wang

Closes #2951 from mjsax/kafka-5126-add-transactions-to-mock-producer


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c6984233
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c6984233
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c6984233

Branch: refs/heads/trunk
Commit: c69842336d87cc321a58171c517c46cdddfe1a64
Parents: 5b36add
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Tue May 9 10:15:40 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue May 9 10:15:40 2017 -0700

----------------------------------------------------------------------
 .../kafka/clients/producer/MockProducer.java    | 164 ++++++-
 .../clients/producer/MockProducerTest.java      | 468 ++++++++++++++++++-
 2 files changed, 592 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c6984233/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index 1b4151c..15ea454 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -51,12 +51,20 @@ public class MockProducer<K, V> implements Producer<K, V> {
     private final Cluster cluster;
     private final Partitioner partitioner;
     private final List<ProducerRecord<K, V>> sent;
+    private final List<ProducerRecord<K, V>> uncommittedSends;
     private final Deque<Completion> completions;
-    private boolean autoComplete;
-    private Map<TopicPartition, Long> offsets;
-    private boolean closed;
+    private final Map<TopicPartition, Long> offsets;
+    private final List<Map<String, Map<TopicPartition, OffsetAndMetadata>>> consumerGroupOffsets;
+    private Map<String, Map<TopicPartition, OffsetAndMetadata>> uncommittedConsumerGroupOffsets;
     private final ExtendedSerializer<K> keySerializer;
     private final ExtendedSerializer<V> valueSerializer;
+    private boolean autoComplete;
+    private boolean closed;
+    private boolean transactionInitialized;
+    private boolean transactionInFlight;
+    private boolean transactionCommitted;
+    private boolean transactionAborted;
+    private boolean producerFenced;
 
     /**
      * Create a mock producer
@@ -80,9 +88,12 @@ public class MockProducer<K, V> implements Producer<K, V> {
         this.partitioner = partitioner;
         this.keySerializer = ensureExtended(keySerializer);
         this.valueSerializer = ensureExtended(valueSerializer);
-        this.offsets = new HashMap<TopicPartition, Long>();
-        this.sent = new ArrayList<ProducerRecord<K, V>>();
-        this.completions = new ArrayDeque<Completion>();
+        this.offsets = new HashMap<>();
+        this.sent = new ArrayList<>();
+        this.uncommittedSends = new ArrayList<>();
+        this.consumerGroupOffsets = new ArrayList<>();
+        this.uncommittedConsumerGroupOffsets = new HashMap<>();
+        this.completions = new ArrayDeque<>();
     }
 
     /**
@@ -117,29 +128,94 @@ public class MockProducer<K, V> implements Producer<K, V> {
         this(Cluster.empty(), false, null, null, null);
     }
 
-    public void initTransactions() {
+    private <T> ExtendedSerializer<T> ensureExtended(Serializer<T> serializer) {
+        return serializer instanceof ExtendedSerializer ? (ExtendedSerializer<T>) serializer : new ExtendedSerializer.Wrapper<>(serializer);
+    }
 
+    @Override
+    public void initTransactions() {
+        verifyProducerState();
+        if (this.transactionInitialized) {
+            throw new IllegalStateException("MockProducer has already been initialized for transactions.");
+        }
+        this.transactionInitialized = true;
     }
 
+    @Override
     public void beginTransaction() throws ProducerFencedException {
-
+        verifyProducerState();
+        verifyTransactionsInitialized();
+        this.transactionInFlight = true;
+        this.transactionCommitted = false;
+        this.transactionAborted = false;
     }
 
+    @Override
     public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
-                                  String consumerGroupId) throws ProducerFencedException {
-
+                                         String consumerGroupId) throws ProducerFencedException {
+        verifyProducerState();
+        verifyTransactionsInitialized();
+        verifyNoTransactionInFlight();
+        Map<TopicPartition, OffsetAndMetadata> uncommittedOffsets = this.uncommittedConsumerGroupOffsets.get(consumerGroupId);
+        if (uncommittedOffsets == null) {
+            uncommittedOffsets = new HashMap<>();
+            this.uncommittedConsumerGroupOffsets.put(consumerGroupId, uncommittedOffsets);
+        }
+        uncommittedOffsets.putAll(offsets);
     }
 
+    @Override
     public void commitTransaction() throws ProducerFencedException {
+        verifyProducerState();
+        verifyTransactionsInitialized();
+        verifyNoTransactionInFlight();
+
+        flush();
+
+        this.sent.addAll(this.uncommittedSends);
+        if (!this.uncommittedConsumerGroupOffsets.isEmpty())
+            this.consumerGroupOffsets.add(this.uncommittedConsumerGroupOffsets);
+
+        this.uncommittedSends.clear();
+        this.uncommittedConsumerGroupOffsets = new HashMap<>();
+        this.transactionCommitted = true;
+        this.transactionAborted = false;
+        this.transactionInFlight = false;
 
     }
 
+    @Override
     public void abortTransaction() throws ProducerFencedException {
-        
+        verifyProducerState();
+        verifyTransactionsInitialized();
+        verifyNoTransactionInFlight();
+        flush();
+        this.uncommittedSends.clear();
+        this.uncommittedConsumerGroupOffsets.clear();
+        this.transactionCommitted = false;
+        this.transactionAborted = true;
+        this.transactionInFlight = false;
     }
-        
-    private <T> ExtendedSerializer<T> ensureExtended(Serializer<T> serializer) {
-        return serializer instanceof ExtendedSerializer ? (ExtendedSerializer<T>) serializer : new ExtendedSerializer.Wrapper<>(serializer);
+
+    private void verifyProducerState() {
+        if (this.closed) {
+            throw new IllegalStateException("MockProducer is already closed.");
+        }
+        if (this.producerFenced) {
+            throw new ProducerFencedException("MockProducer is fenced.");
+        }
+    }
+
+    private void verifyTransactionsInitialized() {
+        if (!this.transactionInitialized) {
+            throw new IllegalStateException("MockProducer hasn't been initialized for transactions.");
+        }
+    }
+
+    private void verifyNoTransactionInFlight() {
+        if (!this.transactionInFlight) {
+            throw new IllegalStateException("There is no open transaction.");
+        }
     }
 
     /**
@@ -159,6 +235,7 @@ public class MockProducer<K, V> implements Producer<K, V> {
      */
     @Override
     public synchronized Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
+        verifyProducerState();
         int partition = 0;
         if (!this.cluster.partitionsForTopic(record.topic()).isEmpty())
             partition = partition(record, this.cluster);
@@ -169,11 +246,17 @@ public class MockProducer<K, V> implements Producer<K, V> {
         Completion completion = new Completion(offset,
                                                new RecordMetadata(topicPartition, 0, offset, RecordBatch.NO_TIMESTAMP, 0, 0, 0),
                                                result, callback);
-        this.sent.add(record);
+
+        if (!this.transactionInFlight)
+            this.sent.add(record);
+        else
+            this.uncommittedSends.add(record);
+
         if (autoComplete)
             completion.complete(null);
         else
             this.completions.addLast(completion);
+
         return future;
     }
 
@@ -212,29 +295,68 @@ public class MockProducer<K, V> implements Producer<K, V> {
 
     @Override
     public void close(long timeout, TimeUnit timeUnit) {
-        if (closed) {
-            throw new IllegalStateException("MockedProducer is already closed.");
+        if (this.closed) {
+            throw new IllegalStateException("MockProducer is already closed.");
         }
-        closed = true;
+        if (transactionInFlight)
+            abortTransaction();
+        this.closed = true;
     }
 
     public boolean closed() {
-        return closed;
+        return this.closed;
+    }
+
+    public void fenceProducer() {
+        verifyProducerState();
+        verifyTransactionsInitialized();
+        this.producerFenced = true;
+    }
+
+    public boolean transactionInitialized() {
+        return this.transactionInitialized;
+    }
+
+    public boolean transactionInFlight() {
+        return this.transactionInFlight;
+    }
+
+    public boolean transactionCommitted() {
+        return this.transactionCommitted;
+    }
+
+    public boolean transactionAborted() {
+        return this.transactionAborted;
     }
 
     /**
      * Get the list of sent records since the last call to {@link #clear()}
      */
     public synchronized List<ProducerRecord<K, V>> history() {
-        return new ArrayList<ProducerRecord<K, V>>(this.sent);
+        return new ArrayList<>(this.sent);
     }
 
     /**
-     * Clear the stored history of sent records
+     * Get the list of committed consumer group offsets since the last call to {@link #clear()}
+     */
+    public synchronized List<Map<String, Map<TopicPartition, OffsetAndMetadata>>> consumerGroupOffsetsHistory() {
+        return new ArrayList<>(this.consumerGroupOffsets);
+    }
+    /**
+     *
+     * Clear the stored history of sent records, consumer group offsets, and transactional state
      */
     public synchronized void clear() {
         this.sent.clear();
+        this.uncommittedSends.clear();
         this.completions.clear();
+        this.consumerGroupOffsets.clear();
+        this.uncommittedConsumerGroupOffsets.clear();
+        this.transactionInitialized = false;
+        this.transactionInFlight = false;
+        this.transactionCommitted = false;
+        this.transactionAborted = false;
+        this.producerFenced = false;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/c6984233/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
index a29b881..468ea49 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
@@ -16,41 +16,51 @@
  */
 package org.apache.kafka.clients.producer;
 
-import static java.util.Arrays.asList;
-import static java.util.Collections.singletonList;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.test.MockSerializer;
 import org.junit.Test;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 public class MockProducerTest {
 
-    private String topic = "topic";
+    private final String topic = "topic";
+    private final MockProducer<byte[], byte[]> producer = new MockProducer<>(true, new MockSerializer(), new MockSerializer());
+    private final ProducerRecord<byte[], byte[]> record1 = new ProducerRecord<>(topic, "key1".getBytes(), "value1".getBytes());
+    private final ProducerRecord<byte[], byte[]> record2 = new ProducerRecord<>(topic, "key2".getBytes(), "value2".getBytes());
+
 
     @Test
     @SuppressWarnings("unchecked")
     public void testAutoCompleteMock() throws Exception {
-        MockProducer<byte[], byte[]> producer = new MockProducer<>(true, new MockSerializer(), new MockSerializer());
-        ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, "key".getBytes(), "value".getBytes());
-        Future<RecordMetadata> metadata = producer.send(record);
+        Future<RecordMetadata> metadata = producer.send(record1);
         assertTrue("Send should be immediately complete", metadata.isDone());
         assertFalse("Send should be successful", isError(metadata));
         assertEquals("Offset should be 0", 0L, metadata.get().offset());
         assertEquals(topic, metadata.get().topic());
-        assertEquals("We should have the record in our history", singletonList(record), producer.history());
+        assertEquals("We should have the record in our history", singletonList(record1), producer.history());
         producer.clear();
         assertEquals("Clear should erase our history", 0, producer.history().size());
     }
@@ -72,8 +82,6 @@ public class MockProducerTest {
     @Test
     public void testManualCompletion() throws Exception {
         MockProducer<byte[], byte[]> producer = new MockProducer<>(false, new MockSerializer(), new MockSerializer());
-        ProducerRecord<byte[], byte[]> record1 = new ProducerRecord<>(topic, "key1".getBytes(), "value1".getBytes());
-        ProducerRecord<byte[], byte[]> record2 = new ProducerRecord<>(topic, "key2".getBytes(), "value2".getBytes());
         Future<RecordMetadata> md1 = producer.send(record1);
         assertFalse("Send shouldn't have completed", md1.isDone());
         Future<RecordMetadata> md2 = producer.send(record2);
@@ -98,6 +106,428 @@ public class MockProducerTest {
         assertTrue("Requests should be completed.", md3.isDone() && md4.isDone());
     }
 
+    @Test
+    public void shouldInitTransactions() {
+        producer.initTransactions();
+        assertTrue(producer.transactionInitialized());
+    }
+
+    @Test
+    public void shouldThrowOnInitTransactionIfProducerAlreadyInitializedForTransactions() {
+        producer.initTransactions();
+        try {
+            producer.initTransactions();
+            fail("Should have thrown as producer is already initialized");
+        } catch (IllegalStateException e) { }
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void shouldThrowOnBeginTransactionIfTransactionsNotInitialized() {
+        producer.beginTransaction();
+    }
+
+    @Test
+    public void shouldBeginTransactions() {
+        producer.initTransactions();
+        producer.beginTransaction();
+        assertTrue(producer.transactionInFlight());
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void shouldThrowOnSendOffsetsToTransactionIfTransactionsNotInitialized() {
+        producer.sendOffsetsToTransaction(null, null);
+    }
+
+    @Test
+    public void shouldThrowOnSendOffsetsToTransactionTransactionIfNoTransactionGotStarted() {
+        producer.initTransactions();
+        try {
+            producer.sendOffsetsToTransaction(null, null);
+            fail("Should have thrown as producer has no open transaction");
+        } catch (IllegalStateException e) { }
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void shouldThrowOnCommitIfTransactionsNotInitialized() {
+        producer.commitTransaction();
+    }
+
+    @Test
+    public void shouldThrowOnCommitTransactionIfNoTransactionGotStarted() {
+        producer.initTransactions();
+        try {
+            producer.commitTransaction();
+            fail("Should have thrown as producer has no open transaction");
+        } catch (IllegalStateException e) { }
+    }
+
+    @Test
+    public void shouldCommitEmptyTransaction() {
+        producer.initTransactions();
+        producer.beginTransaction();
+        producer.commitTransaction();
+        assertFalse(producer.transactionInFlight());
+        assertTrue(producer.transactionCommitted());
+        assertFalse(producer.transactionAborted());
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void shouldThrowOnAbortIfTransactionsNotInitialized() {
+        producer.abortTransaction();
+    }
+
+    @Test
+    public void shouldThrowOnAbortTransactionIfNoTransactionGotStarted() {
+        producer.initTransactions();
+        try {
+            producer.abortTransaction();
+            fail("Should have thrown as producer has no open transaction");
+        } catch (IllegalStateException e) { }
+    }
+
+    @Test
+    public void shouldAbortEmptyTransaction() {
+        producer.initTransactions();
+        producer.beginTransaction();
+        producer.abortTransaction();
+        assertFalse(producer.transactionInFlight());
+        assertTrue(producer.transactionAborted());
+        assertFalse(producer.transactionCommitted());
+    }
+
+    @Test
+    public void shouldAbortInFlightTransactionOnClose() {
+        producer.initTransactions();
+        producer.beginTransaction();
+        producer.close();
+        assertFalse(producer.transactionInFlight());
+        assertTrue(producer.transactionAborted());
+        assertFalse(producer.transactionCommitted());
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void shouldThrowFenceProducerIfTransactionsNotInitialized() {
+        producer.fenceProducer();
+    }
+
+    @Test
+    public void shouldThrowOnBeginTransactionsIfProducerGotFenced() {
+        producer.initTransactions();
+        producer.fenceProducer();
+        try {
+            producer.beginTransaction();
+            fail("Should have thrown as producer is fenced off");
+        } catch (ProducerFencedException e) { }
+    }
+
+    @Test
+    public void shouldThrowOnSendIfProducerGotFenced() {
+        producer.initTransactions();
+        producer.fenceProducer();
+        try {
+            producer.send(null);
+            fail("Should have thrown as producer is fenced off");
+        } catch (ProducerFencedException e) { }
+    }
+
+    @Test
+    public void shouldThrowOnSendOffsetsToTransactionIfProducerGotFenced() {
+        producer.initTransactions();
+        producer.fenceProducer();
+        try {
+            producer.sendOffsetsToTransaction(null, null);
+            fail("Should have thrown as producer is fenced off");
+        } catch (ProducerFencedException e) { }
+    }
+
+    @Test
+    public void shouldThrowOnCommitTransactionIfProducerGotFenced() {
+        producer.initTransactions();
+        producer.fenceProducer();
+        try {
+            producer.commitTransaction();
+            fail("Should have thrown as producer is fenced off");
+        } catch (ProducerFencedException e) { }
+    }
+
+    @Test
+    public void shouldThrowOnAbortTransactionIfProducerGotFenced() {
+        producer.initTransactions();
+        producer.fenceProducer();
+        try {
+            producer.abortTransaction();
+            fail("Should have thrown as producer is fenced off");
+        } catch (ProducerFencedException e) { }
+    }
+
+    @Test
+    public void shouldPublishMessagesOnlyAfterCommitIfTransactionsAreEnabled() {
+        producer.initTransactions();
+        producer.beginTransaction();
+
+        producer.send(record1);
+        producer.send(record2);
+
+        assertTrue(producer.history().isEmpty());
+
+        producer.commitTransaction();
+
+        List<ProducerRecord<byte[], byte[]>> expectedResult = new ArrayList<>();
+        expectedResult.add(record1);
+        expectedResult.add(record2);
+
+        assertThat(producer.history(), equalTo(expectedResult));
+    }
+
+    @Test
+    public void shouldFlushOnCommitForNonAutoCompleteIfTransactionsAreEnabled() {
+        MockProducer<byte[], byte[]> producer = new MockProducer<>(false, new MockSerializer(), new MockSerializer());
+        producer.initTransactions();
+        producer.beginTransaction();
+
+        Future<RecordMetadata> md1 = producer.send(record1);
+        Future<RecordMetadata> md2 = producer.send(record2);
+
+        assertFalse(md1.isDone());
+        assertFalse(md2.isDone());
+
+        producer.commitTransaction();
+
+        assertTrue(md1.isDone());
+        assertTrue(md2.isDone());
+    }
+
+    @Test
+    public void shouldDropMessagesOnAbortIfTransactionsAreEnabled() {
+        producer.initTransactions();
+
+        producer.beginTransaction();
+        producer.send(record1);
+        producer.send(record2);
+        producer.abortTransaction();
+        assertTrue(producer.history().isEmpty());
+
+        producer.beginTransaction();
+        producer.commitTransaction();
+        assertTrue(producer.history().isEmpty());
+    }
+
+    @Test
+    public void shouldThrowOnAbortForNonAutoCompleteIfTransactionsAreEnabled() throws Exception {
+        MockProducer<byte[], byte[]> producer = new MockProducer<>(false, new MockSerializer(), new MockSerializer());
+        producer.initTransactions();
+        producer.beginTransaction();
+
+        Future<RecordMetadata> md1 = producer.send(record1);
+        assertFalse(md1.isDone());
+
+        producer.abortTransaction();
+        assertTrue(md1.isDone());
+    }
+
+    @Test
+    public void shouldPreserveCommittedMessagesOnAbortIfTransactionsAreEnabled() {
+        producer.initTransactions();
+
+        producer.beginTransaction();
+        producer.send(record1);
+        producer.send(record2);
+        producer.commitTransaction();
+
+        producer.beginTransaction();
+        producer.abortTransaction();
+
+        List<ProducerRecord<byte[], byte[]>> expectedResult = new ArrayList<>();
+        expectedResult.add(record1);
+        expectedResult.add(record2);
+
+        assertThat(producer.history(), equalTo(expectedResult));
+    }
+
+    @Test
+    public void shouldPublishConsumerGroupOffsetsOnlyAfterCommitIfTransactionsAreEnabled() {
+        producer.initTransactions();
+        producer.beginTransaction();
+
+        String group1 = "g1";
+        Map<TopicPartition, OffsetAndMetadata> group1Commit = new HashMap<TopicPartition, OffsetAndMetadata>() {
+            {
+                put(new TopicPartition(topic, 0), new OffsetAndMetadata(42L, null));
+                put(new TopicPartition(topic, 1), new OffsetAndMetadata(73L, null));
+            }
+        };
+        String group2 = "g2";
+        Map<TopicPartition, OffsetAndMetadata> group2Commit = new HashMap<TopicPartition, OffsetAndMetadata>() {
+            {
+                put(new TopicPartition(topic, 0), new OffsetAndMetadata(101L, null));
+                put(new TopicPartition(topic, 1), new OffsetAndMetadata(21L, null));
+            }
+        };
+        producer.sendOffsetsToTransaction(group1Commit, group1);
+        producer.sendOffsetsToTransaction(group2Commit, group2);
+
+        assertTrue(producer.consumerGroupOffsetsHistory().isEmpty());
+
+        Map<String, Map<TopicPartition, OffsetAndMetadata>> expectedResult = new HashMap<>();
+        expectedResult.put(group1, group1Commit);
+        expectedResult.put(group2, group2Commit);
+
+        producer.commitTransaction();
+        assertThat(producer.consumerGroupOffsetsHistory(), equalTo(Collections.singletonList(expectedResult)));
+    }
+
+    @Test
+    public void shouldPublishLatestAndCumulativeConsumerGroupOffsetsOnlyAfterCommitIfTransactionsAreEnabled() {
+        producer.initTransactions();
+        producer.beginTransaction();
+
+        String group = "g";
+        Map<TopicPartition, OffsetAndMetadata> groupCommit1 = new HashMap<TopicPartition, OffsetAndMetadata>() {
+            {
+                put(new TopicPartition(topic, 0), new OffsetAndMetadata(42L, null));
+                put(new TopicPartition(topic, 1), new OffsetAndMetadata(73L, null));
+            }
+        };
+        Map<TopicPartition, OffsetAndMetadata> groupCommit2 = new HashMap<TopicPartition, OffsetAndMetadata>() {
+            {
+                put(new TopicPartition(topic, 1), new OffsetAndMetadata(101L, null));
+                put(new TopicPartition(topic, 2), new OffsetAndMetadata(21L, null));
+            }
+        };
+        producer.sendOffsetsToTransaction(groupCommit1, group);
+        producer.sendOffsetsToTransaction(groupCommit2, group);
+
+        assertTrue(producer.consumerGroupOffsetsHistory().isEmpty());
+
+        Map<String, Map<TopicPartition, OffsetAndMetadata>> expectedResult = new HashMap<>();
+        expectedResult.put(group, new HashMap<TopicPartition, OffsetAndMetadata>() {
+            {
+                put(new TopicPartition(topic, 0), new OffsetAndMetadata(42L, null));
+                put(new TopicPartition(topic, 1), new OffsetAndMetadata(101L, null));
+                put(new TopicPartition(topic, 2), new OffsetAndMetadata(21L, null));
+            }
+        });
+
+        producer.commitTransaction();
+        assertThat(producer.consumerGroupOffsetsHistory(), equalTo(Collections.singletonList(expectedResult)));
+    }
+
+    @Test
+    public void shouldDropConsumerGroupOffsetsOnAbortIfTransactionsAreEnabled() {
+        producer.initTransactions();
+        producer.beginTransaction();
+
+        String group = "g";
+        Map<TopicPartition, OffsetAndMetadata> groupCommit = new HashMap<TopicPartition, OffsetAndMetadata>() {
+            {
+                put(new TopicPartition(topic, 0), new OffsetAndMetadata(42L, null));
+                put(new TopicPartition(topic, 1), new OffsetAndMetadata(73L, null));
+            }
+        };
+        producer.sendOffsetsToTransaction(groupCommit, group);
+        producer.abortTransaction();
+
+        producer.beginTransaction();
+        producer.commitTransaction();
+        assertTrue(producer.consumerGroupOffsetsHistory().isEmpty());
+    }
+
+    @Test
+    public void shouldPreserveCommittedConsumerGroupsOffsetsOnAbortIfTransactionsAreEnabled() {
+        producer.initTransactions();
+        producer.beginTransaction();
+
+        String group = "g";
+        Map<TopicPartition, OffsetAndMetadata> groupCommit = new HashMap<TopicPartition, OffsetAndMetadata>() {
+            {
+                put(new TopicPartition(topic, 0), new OffsetAndMetadata(42L, null));
+                put(new TopicPartition(topic, 1), new OffsetAndMetadata(73L, null));
+            }
+        };
+        producer.sendOffsetsToTransaction(groupCommit, group);
+        producer.commitTransaction();
+
+        producer.beginTransaction();
+        producer.abortTransaction();
+
+        Map<String, Map<TopicPartition, OffsetAndMetadata>> expectedResult = new HashMap<>();
+        expectedResult.put(group, groupCommit);
+
+        assertThat(producer.consumerGroupOffsetsHistory(), equalTo(Collections.singletonList(expectedResult)));
+    }
+
+    @Test
+    public void shouldThrowOnInitTransactionIfProducerIsClosed() {
+        producer.close();
+        try {
+            producer.initTransactions();
+            fail("Should have thrown as producer is already closed");
+        } catch (IllegalStateException e) { }
+    }
+
+    @Test
+    public void shouldThrowOnSendIfProducerIsClosed() {
+        producer.close();
+        try {
+            producer.send(null);
+            fail("Should have thrown as producer is already closed");
+        } catch (IllegalStateException e) { }
+    }
+
+    @Test
+    public void shouldThrowOnBeginTransactionIfProducerIsClosed() {
+        producer.close();
+        try {
+            producer.beginTransaction();
+            fail("Should have thrown as producer is already closed");
+        } catch (IllegalStateException e) { }
+    }
+
+    @Test
+    public void shouldThrowSendOffsetsToTransactionIfProducerIsClosed() {
+        producer.close();
+        try {
+            producer.sendOffsetsToTransaction(null, null);
+            fail("Should have thrown as producer is already closed");
+        } catch (IllegalStateException e) { }
+    }
+
+    @Test
+    public void shouldThrowOnCommitTransactionIfProducerIsClosed() {
+        producer.close();
+        try {
+            producer.commitTransaction();
+            fail("Should have thrown as producer is already closed");
+        } catch (IllegalStateException e) { }
+    }
+
+    @Test
+    public void shouldThrowOnAbortTransactionIfProducerIsClosed() {
+        producer.close();
+        try {
+            producer.abortTransaction();
+            fail("Should have thrown as producer is already closed");
+        } catch (IllegalStateException e) { }
+    }
+
+    @Test
+    public void shouldThrowOnCloseIfProducerIsClosed() {
+        producer.close();
+        try {
+            producer.close();
+            fail("Should have thrown as producer is already closed");
+        } catch (IllegalStateException e) { }
+    }
+
+    @Test
+    public void shouldThrowOnFenceProducerIfProducerIsClosed() {
+        producer.close();
+        try {
+            producer.fenceProducer();
+            fail("Should have thrown as producer is already closed");
+        } catch (IllegalStateException e) { }
+    }
+
     private boolean isError(Future<?> future) {
         try {
             future.get();