You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/09/14 10:24:53 UTC

kafka git commit: MINOR: Update TransactionManager to use LogContext

Repository: kafka
Updated Branches:
  refs/heads/trunk a0ad9f156 -> 2656659e0


MINOR: Update TransactionManager to use LogContext

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Manikumar Reddy <ma...@gmail.com>, Ismael Juma <is...@juma.me.uk>

Closes #3852 from hachikuji/minor-use-log-context-txn-manager


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2656659e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2656659e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2656659e

Branch: refs/heads/trunk
Commit: 2656659e0d7c0e427768ce216df2698acc8c9b11
Parents: a0ad9f1
Author: Jason Gustafson <ja...@confluent.io>
Authored: Thu Sep 14 11:23:53 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Thu Sep 14 11:24:25 2017 +0100

----------------------------------------------------------------------
 .../kafka/clients/producer/KafkaProducer.java   |  6 +--
 .../clients/producer/internals/Sender.java      | 12 ++---
 .../producer/internals/TransactionManager.java  | 53 +++++++++-----------
 .../clients/producer/internals/SenderTest.java  | 24 ++++-----
 .../internals/TransactionManagerTest.java       |  3 +-
 5 files changed, 46 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2656659e/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 816566f..18248bb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -363,7 +363,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
 
             this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
             this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
-            this.transactionManager = configureTransactionState(config, log);
+            this.transactionManager = configureTransactionState(config, logContext, log);
             int retries = configureRetries(config, transactionManager != null, log);
             int maxInflightRequests = configureInflightRequests(config, transactionManager != null, log);
             short acks = configureAcks(config, transactionManager != null, log);
@@ -429,7 +429,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
         }
     }
 
-    private static TransactionManager configureTransactionState(ProducerConfig config, Logger log) {
+    private static TransactionManager configureTransactionState(ProducerConfig config, LogContext logContext, Logger log) {
 
         TransactionManager transactionManager = null;
 
@@ -453,7 +453,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             String transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
             int transactionTimeoutMs = config.getInt(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG);
             long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
-            transactionManager = new TransactionManager(transactionalId, transactionTimeoutMs, retryBackoffMs);
+            transactionManager = new TransactionManager(logContext, transactionalId, transactionTimeoutMs, retryBackoffMs);
             if (transactionManager.isTransactional())
                 log.info("Instantiated a transactional producer.");
             else

http://git-wip-us.apache.org/repos/asf/kafka/blob/2656659e/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index bf3714e..8da411c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -346,15 +346,14 @@ public class Sender implements Runnable {
                     ClientRequest clientRequest = client.newClientRequest(targetNode.idString(),
                             requestBuilder, now, true, nextRequestHandler);
                     transactionManager.setInFlightRequestCorrelationId(clientRequest.correlationId());
-                    log.debug("{}Sending transactional request {} to node {}",
-                            transactionManager.logPrefix, requestBuilder, targetNode);
+                    log.debug("Sending transactional request {} to node {}", requestBuilder, targetNode);
 
                     client.send(clientRequest, now);
                     return true;
                 }
             } catch (IOException e) {
-                log.debug("{}Disconnect from {} while trying to send request {}. Going " +
-                        "to back off and retry", transactionManager.logPrefix, targetNode, requestBuilder);
+                log.debug("Disconnect from {} while trying to send request {}. Going " +
+                        "to back off and retry", targetNode, requestBuilder);
                 if (nextRequestHandler.needsCoordinator()) {
                     // We break here so that we pick up the FindCoordinator request immediately.
                     transactionManager.lookupCoordinator(nextRequestHandler);
@@ -372,10 +371,7 @@ public class Sender implements Runnable {
 
     private void maybeAbortBatches(RuntimeException exception) {
         if (accumulator.hasIncomplete()) {
-            String logPrefix = "";
-            if (transactionManager != null)
-                logPrefix = transactionManager.logPrefix;
-            log.error("{}Aborting producer batches due to fatal error", logPrefix, exception);
+            log.error("Aborting producer batches due to fatal error", exception);
             accumulator.abortBatches(exception);
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2656659e/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index fad0332..05d943c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -41,8 +41,8 @@ import org.apache.kafka.common.requests.TransactionResult;
 import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
 import org.apache.kafka.common.requests.TxnOffsetCommitRequest.CommittedOffset;
 import org.apache.kafka.common.requests.TxnOffsetCommitResponse;
+import org.apache.kafka.common.utils.LogContext;
 import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Comparator;
@@ -59,14 +59,12 @@ import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_ID;
  * A class which maintains state for transactions. Also keeps the state necessary to ensure idempotent production.
  */
 public class TransactionManager {
-    private static final Logger log = LoggerFactory.getLogger(TransactionManager.class);
     private static final int NO_INFLIGHT_REQUEST_CORRELATION_ID = -1;
 
+    private final Logger log;
     private final String transactionalId;
     private final int transactionTimeoutMs;
 
-    public final String logPrefix;
-
     private final Map<TopicPartition, Integer> sequenceNumbers;
     private final PriorityQueue<TxnRequestHandler> pendingRequests;
     private final Set<TopicPartition> newPartitionsInTransaction;
@@ -142,11 +140,11 @@ public class TransactionManager {
         }
     }
 
-    public TransactionManager(String transactionalId, int transactionTimeoutMs, long retryBackoffMs) {
+    public TransactionManager(LogContext logContext, String transactionalId, int transactionTimeoutMs, long retryBackoffMs) {
         this.producerIdAndEpoch = new ProducerIdAndEpoch(NO_PRODUCER_ID, NO_PRODUCER_EPOCH);
         this.sequenceNumbers = new HashMap<>();
         this.transactionalId = transactionalId;
-        this.logPrefix = transactionalId == null ? "" : "[TransactionalId " + transactionalId + "] ";
+        this.log = logContext.logger(TransactionManager.class);
         this.transactionTimeoutMs = transactionTimeoutMs;
         this.transactionCoordinator = null;
         this.consumerGroupCoordinator = null;
@@ -165,7 +163,7 @@ public class TransactionManager {
     }
 
     TransactionManager() {
-        this(null, 0, 100);
+        this(new LogContext(), null, 0, 100);
     }
 
     public synchronized TransactionalRequestResult initializeTransactions() {
@@ -221,7 +219,7 @@ public class TransactionManager {
             throw new KafkaException("Cannot send offsets to transaction either because the producer is not in an " +
                     "active transaction");
 
-        log.debug("{}Begin adding offsets {} for consumer group {} to transaction", logPrefix, offsets, consumerGroupId);
+        log.debug("Begin adding offsets {} for consumer group {} to transaction", offsets, consumerGroupId);
         AddOffsetsToTxnRequest.Builder builder = new AddOffsetsToTxnRequest.Builder(transactionalId,
                 producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, consumerGroupId);
         AddOffsetsToTxnHandler handler = new AddOffsetsToTxnHandler(builder, offsets);
@@ -235,7 +233,7 @@ public class TransactionManager {
         if (isPartitionAdded(topicPartition) || isPartitionPendingAdd(topicPartition))
             return;
 
-        log.debug("{}Begin adding new partition {} to transaction", logPrefix, topicPartition);
+        log.debug("Begin adding new partition {} to transaction", topicPartition);
         newPartitionsInTransaction.add(topicPartition);
     }
 
@@ -338,8 +336,7 @@ public class TransactionManager {
      * Set the producer id and epoch atomically.
      */
     void setProducerIdAndEpoch(ProducerIdAndEpoch producerIdAndEpoch) {
-        log.info("{}ProducerId set to {} with epoch {}", logPrefix, producerIdAndEpoch.producerId,
-                producerIdAndEpoch.epoch);
+        log.info("ProducerId set to {} with epoch {}", producerIdAndEpoch.producerId, producerIdAndEpoch.epoch);
         this.producerIdAndEpoch = producerIdAndEpoch;
     }
 
@@ -403,23 +400,23 @@ public class TransactionManager {
 
         pendingRequests.poll();
         if (maybeTerminateRequestWithError(nextRequestHandler)) {
-            log.trace("{}Not sending transactional request {} because we are in an error state",
-                    logPrefix, nextRequestHandler.requestBuilder());
+            log.trace("Not sending transactional request {} because we are in an error state",
+                    nextRequestHandler.requestBuilder());
             return null;
         }
 
         if (nextRequestHandler.isEndTxn() && !transactionStarted) {
             nextRequestHandler.result.done();
             if (currentState != State.FATAL_ERROR) {
-                log.debug("{}Not sending EndTxn for completed transaction since no partitions " +
-                        "or offsets were successfully added", logPrefix);
+                log.debug("Not sending EndTxn for completed transaction since no partitions " +
+                        "or offsets were successfully added");
                 completeTransaction();
             }
             nextRequestHandler = pendingRequests.poll();
         }
 
         if (nextRequestHandler != null)
-            log.trace("{}Request {} dequeued for sending", logPrefix, nextRequestHandler.requestBuilder());
+            log.trace("Request {} dequeued for sending", nextRequestHandler.requestBuilder());
 
         return nextRequestHandler;
     }
@@ -507,9 +504,9 @@ public class TransactionManager {
         }
 
         if (lastError != null)
-            log.debug("{}Transition from state {} to error state {}", logPrefix, currentState, target, lastError);
+            log.debug("Transition from state {} to error state {}", currentState, target, lastError);
         else
-            log.debug("{}Transition from state {} to {}", logPrefix, currentState, target);
+            log.debug("Transition from state {} to {}", currentState, target);
 
         currentState = target;
     }
@@ -537,7 +534,7 @@ public class TransactionManager {
     }
 
     private void enqueueRequest(TxnRequestHandler requestHandler) {
-        log.debug("{}Enqueuing transactional request {}", logPrefix, requestHandler.requestBuilder());
+        log.debug("Enqueuing transactional request {}", requestHandler.requestBuilder());
         pendingRequests.add(requestHandler);
     }
 
@@ -634,15 +631,15 @@ public class TransactionManager {
             } else {
                 clearInFlightRequestCorrelationId();
                 if (response.wasDisconnected()) {
-                    log.debug("{}Disconnected from {}. Will retry.", logPrefix, response.destination());
+                    log.debug("Disconnected from {}. Will retry.", response.destination());
                     if (this.needsCoordinator())
                         lookupCoordinator(this.coordinatorType(), this.coordinatorKey());
                     reenqueue();
                 } else if (response.versionMismatch() != null) {
                     fatalError(response.versionMismatch());
                 } else if (response.hasResponse()) {
-                    log.trace("{}Received transactional response {} for request {}", logPrefix,
-                            response.responseBody(), requestBuilder());
+                    log.trace("Received transactional response {} for request {}", response.responseBody(),
+                            requestBuilder());
                     synchronized (TransactionManager.this) {
                         handleResponse(response.responseBody());
                     }
@@ -781,10 +778,11 @@ public class TransactionManager {
                 } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
                     unauthorizedTopics.add(topicPartition.topic());
                 } else if (error == Errors.OPERATION_NOT_ATTEMPTED) {
-                    log.debug("{}Did not attempt to add partition {} to transaction because other partitions in the batch had errors.", logPrefix, topicPartition);
+                    log.debug("Did not attempt to add partition {} to transaction because other partitions in the " +
+                            "batch had errors.", topicPartition);
                     hasPartitionErrors = true;
                 } else {
-                    log.error("{}Could not add partition {} due to unexpected error {}", logPrefix, topicPartition, error);
+                    log.error("Could not add partition {} due to unexpected error {}", topicPartition, error);
                     hasPartitionErrors = true;
                 }
             }
@@ -803,7 +801,7 @@ public class TransactionManager {
             } else if (hasPartitionErrors) {
                 abortableError(new KafkaException("Could not add partitions to transaction due to errors: " + errors));
             } else {
-                log.debug("{}Successfully added partitions {} to transaction", logPrefix, partitions);
+                log.debug("Successfully added partitions {} to transaction", partitions);
                 partitionsInTransaction.addAll(partitions);
                 transactionStarted = true;
                 result.done();
@@ -956,8 +954,7 @@ public class TransactionManager {
             Errors error = addOffsetsToTxnResponse.error();
 
             if (error == Errors.NONE) {
-                log.debug("{}Successfully added partition for consumer group {} to transaction", logPrefix,
-                        builder.consumerGroupId());
+                log.debug("Successfully added partition for consumer group {} to transaction", builder.consumerGroupId());
 
                 // note the result is not completed until the TxnOffsetCommit returns
                 pendingRequests.add(txnOffsetCommitHandler(result, offsets, builder.consumerGroupId()));
@@ -1019,7 +1016,7 @@ public class TransactionManager {
                 TopicPartition topicPartition = entry.getKey();
                 Errors error = entry.getValue();
                 if (error == Errors.NONE) {
-                    log.debug("{}Successfully added offsets {} from consumer group {} to transaction.", logPrefix,
+                    log.debug("Successfully added offsets {} from consumer group {} to transaction.",
                             builder.offsets(), builder.consumerGroupId());
                     pendingTxnOffsetCommits.remove(topicPartition);
                 } else if (error == Errors.COORDINATOR_NOT_AVAILABLE

http://git-wip-us.apache.org/repos/asf/kafka/blob/2656659e/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 85b5ba6..6f98e52 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -104,7 +104,7 @@ public class SenderTest {
     private RecordAccumulator accumulator = null;
     private Sender sender = null;
     private SenderMetricsRegistry senderMetricsRegistry = null;
-    private final LogContext loggerFactory = new LogContext();
+    private final LogContext logContext = new LogContext();
 
     @Before
     public void setup() {
@@ -240,7 +240,7 @@ public class SenderTest {
         Node node = cluster.nodes().get(0);
         NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE,
                 1000, 1000, 64 * 1024, 64 * 1024, 1000,
-                time, true, new ApiVersions(), throttleTimeSensor, new LogContext());
+                time, true, new ApiVersions(), throttleTimeSensor, logContext);
 
         short apiVersionsResponseVersion = ApiKeys.API_VERSIONS.latestVersion();
         ByteBuffer buffer = ApiVersionsResponse.createApiVersionsResponse(400, RecordBatch.CURRENT_MAGIC_VALUE).serialize(apiVersionsResponseVersion, new ResponseHeader(0));
@@ -277,7 +277,7 @@ public class SenderTest {
         int maxRetries = 1;
         Metrics m = new Metrics();
         try {
-            Sender sender = new Sender(loggerFactory, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
+            Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
                     maxRetries, m, new SenderMetricsRegistry(), time, REQUEST_TIMEOUT, 50, null, apiVersions);
             // do a successful retry
             Future<RecordMetadata> future = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
@@ -324,7 +324,7 @@ public class SenderTest {
         int maxRetries = 1;
         Metrics m = new Metrics();
         try {
-            Sender sender = new Sender(loggerFactory, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
+            Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
                     m, new SenderMetricsRegistry(), time, REQUEST_TIMEOUT, 50, null, apiVersions);
             // Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1
             Cluster cluster1 = TestUtils.clusterWith(2, "test", 2);
@@ -576,7 +576,7 @@ public class SenderTest {
 
         int maxRetries = 10;
         Metrics m = new Metrics();
-        Sender sender = new Sender(loggerFactory, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
+        Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
                 m, new SenderMetricsRegistry(), time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
 
         Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
@@ -618,7 +618,7 @@ public class SenderTest {
         int maxRetries = 10;
         Metrics m = new Metrics();
         SenderMetricsRegistry metricsRegistry = new SenderMetricsRegistry();
-        Sender sender = new Sender(loggerFactory, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
+        Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
                 m, metricsRegistry, time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
 
         Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
@@ -655,7 +655,7 @@ public class SenderTest {
 
         int maxRetries = 10;
         Metrics m = new Metrics();
-        Sender sender = new Sender(loggerFactory, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
+        Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
                 m, new SenderMetricsRegistry(), time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
 
         Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
@@ -684,7 +684,7 @@ public class SenderTest {
     public void testTransactionalSplitBatchAndSend() throws Exception {
         ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
         TopicPartition tp = new TopicPartition("testSplitBatchAndSend", 1);
-        TransactionManager txnManager = new TransactionManager("testSplitBatchAndSend", 60000, 100);
+        TransactionManager txnManager = new TransactionManager(logContext, "testSplitBatchAndSend", 60000, 100);
 
         setupWithTransactionState(txnManager);
         doInitTransactions(txnManager, producerIdAndEpoch);
@@ -705,10 +705,10 @@ public class SenderTest {
         // Set a good compression ratio.
         CompressionRatioEstimator.setEstimation(topic, CompressionType.GZIP, 0.2f);
         try (Metrics m = new Metrics()) {
-            accumulator = new RecordAccumulator(loggerFactory, batchSize, 1024 * 1024, CompressionType.GZIP, 0L, 0L, m, time,
+            accumulator = new RecordAccumulator(logContext, batchSize, 1024 * 1024, CompressionType.GZIP, 0L, 0L, m, time,
                     new ApiVersions(), txnManager);
             SenderMetricsRegistry metricsRegistry = new SenderMetricsRegistry();
-            Sender sender = new Sender(loggerFactory, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
+            Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
                     m, metricsRegistry, time, REQUEST_TIMEOUT, 1000L, txnManager, new ApiVersions());
             // Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1
             Cluster cluster1 = TestUtils.clusterWith(2, topic, 2);
@@ -826,10 +826,10 @@ public class SenderTest {
         metricTags.put("client-id", CLIENT_ID);
         MetricConfig metricConfig = new MetricConfig().tags(metricTags);
         this.metrics = new Metrics(metricConfig, time);
-        this.accumulator = new RecordAccumulator(loggerFactory, batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, metrics, time,
+        this.accumulator = new RecordAccumulator(logContext, batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, metrics, time,
                 apiVersions, transactionManager);
         this.senderMetricsRegistry = new SenderMetricsRegistry(metricTags.keySet());
-        this.sender = new Sender(loggerFactory, this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL,
+        this.sender = new Sender(logContext, this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL,
                 MAX_RETRIES, this.metrics, this.senderMetricsRegistry, this.time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
         this.metadata.update(this.cluster, Collections.<String>emptySet(), time.milliseconds());
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2656659e/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 1219b9c..53bba1c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -120,7 +120,8 @@ public class TransactionManagerTest {
         int batchSize = 16 * 1024;
         MetricConfig metricConfig = new MetricConfig().tags(metricTags);
         this.brokerNode = new Node(0, "localhost", 2211);
-        this.transactionManager = new TransactionManager(transactionalId, transactionTimeoutMs, DEFAULT_RETRY_BACKOFF_MS);
+        this.transactionManager = new TransactionManager(logContext, transactionalId, transactionTimeoutMs,
+                DEFAULT_RETRY_BACKOFF_MS);
         Metrics metrics = new Metrics(metricConfig, time);
         this.accumulator = new RecordAccumulator(logContext, batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, metrics, time, apiVersions, transactionManager);
         this.sender = new Sender(logContext, this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL,