You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/09/14 10:24:53 UTC
kafka git commit: MINOR: Update TransactionManager to use LogContext
Repository: kafka
Updated Branches:
refs/heads/trunk a0ad9f156 -> 2656659e0
MINOR: Update TransactionManager to use LogContext
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Manikumar Reddy <ma...@gmail.com>, Ismael Juma <is...@juma.me.uk>
Closes #3852 from hachikuji/minor-use-log-context-txn-manager
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2656659e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2656659e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2656659e
Branch: refs/heads/trunk
Commit: 2656659e0d7c0e427768ce216df2698acc8c9b11
Parents: a0ad9f1
Author: Jason Gustafson <ja...@confluent.io>
Authored: Thu Sep 14 11:23:53 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Thu Sep 14 11:24:25 2017 +0100
----------------------------------------------------------------------
.../kafka/clients/producer/KafkaProducer.java | 6 +--
.../clients/producer/internals/Sender.java | 12 ++---
.../producer/internals/TransactionManager.java | 53 +++++++++-----------
.../clients/producer/internals/SenderTest.java | 24 ++++-----
.../internals/TransactionManagerTest.java | 3 +-
5 files changed, 46 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/2656659e/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 816566f..18248bb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -363,7 +363,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
- this.transactionManager = configureTransactionState(config, log);
+ this.transactionManager = configureTransactionState(config, logContext, log);
int retries = configureRetries(config, transactionManager != null, log);
int maxInflightRequests = configureInflightRequests(config, transactionManager != null, log);
short acks = configureAcks(config, transactionManager != null, log);
@@ -429,7 +429,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
}
}
- private static TransactionManager configureTransactionState(ProducerConfig config, Logger log) {
+ private static TransactionManager configureTransactionState(ProducerConfig config, LogContext logContext, Logger log) {
TransactionManager transactionManager = null;
@@ -453,7 +453,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
String transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
int transactionTimeoutMs = config.getInt(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG);
long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
- transactionManager = new TransactionManager(transactionalId, transactionTimeoutMs, retryBackoffMs);
+ transactionManager = new TransactionManager(logContext, transactionalId, transactionTimeoutMs, retryBackoffMs);
if (transactionManager.isTransactional())
log.info("Instantiated a transactional producer.");
else
http://git-wip-us.apache.org/repos/asf/kafka/blob/2656659e/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index bf3714e..8da411c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -346,15 +346,14 @@ public class Sender implements Runnable {
ClientRequest clientRequest = client.newClientRequest(targetNode.idString(),
requestBuilder, now, true, nextRequestHandler);
transactionManager.setInFlightRequestCorrelationId(clientRequest.correlationId());
- log.debug("{}Sending transactional request {} to node {}",
- transactionManager.logPrefix, requestBuilder, targetNode);
+ log.debug("Sending transactional request {} to node {}", requestBuilder, targetNode);
client.send(clientRequest, now);
return true;
}
} catch (IOException e) {
- log.debug("{}Disconnect from {} while trying to send request {}. Going " +
- "to back off and retry", transactionManager.logPrefix, targetNode, requestBuilder);
+ log.debug("Disconnect from {} while trying to send request {}. Going " +
+ "to back off and retry", targetNode, requestBuilder);
if (nextRequestHandler.needsCoordinator()) {
// We break here so that we pick up the FindCoordinator request immediately.
transactionManager.lookupCoordinator(nextRequestHandler);
@@ -372,10 +371,7 @@ public class Sender implements Runnable {
private void maybeAbortBatches(RuntimeException exception) {
if (accumulator.hasIncomplete()) {
- String logPrefix = "";
- if (transactionManager != null)
- logPrefix = transactionManager.logPrefix;
- log.error("{}Aborting producer batches due to fatal error", logPrefix, exception);
+ log.error("Aborting producer batches due to fatal error", exception);
accumulator.abortBatches(exception);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2656659e/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index fad0332..05d943c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -41,8 +41,8 @@ import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
import org.apache.kafka.common.requests.TxnOffsetCommitRequest.CommittedOffset;
import org.apache.kafka.common.requests.TxnOffsetCommitResponse;
+import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Comparator;
@@ -59,14 +59,12 @@ import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_ID;
* A class which maintains state for transactions. Also keeps the state necessary to ensure idempotent production.
*/
public class TransactionManager {
- private static final Logger log = LoggerFactory.getLogger(TransactionManager.class);
private static final int NO_INFLIGHT_REQUEST_CORRELATION_ID = -1;
+ private final Logger log;
private final String transactionalId;
private final int transactionTimeoutMs;
- public final String logPrefix;
-
private final Map<TopicPartition, Integer> sequenceNumbers;
private final PriorityQueue<TxnRequestHandler> pendingRequests;
private final Set<TopicPartition> newPartitionsInTransaction;
@@ -142,11 +140,11 @@ public class TransactionManager {
}
}
- public TransactionManager(String transactionalId, int transactionTimeoutMs, long retryBackoffMs) {
+ public TransactionManager(LogContext logContext, String transactionalId, int transactionTimeoutMs, long retryBackoffMs) {
this.producerIdAndEpoch = new ProducerIdAndEpoch(NO_PRODUCER_ID, NO_PRODUCER_EPOCH);
this.sequenceNumbers = new HashMap<>();
this.transactionalId = transactionalId;
- this.logPrefix = transactionalId == null ? "" : "[TransactionalId " + transactionalId + "] ";
+ this.log = logContext.logger(TransactionManager.class);
this.transactionTimeoutMs = transactionTimeoutMs;
this.transactionCoordinator = null;
this.consumerGroupCoordinator = null;
@@ -165,7 +163,7 @@ public class TransactionManager {
}
TransactionManager() {
- this(null, 0, 100);
+ this(new LogContext(), null, 0, 100);
}
public synchronized TransactionalRequestResult initializeTransactions() {
@@ -221,7 +219,7 @@ public class TransactionManager {
throw new KafkaException("Cannot send offsets to transaction either because the producer is not in an " +
"active transaction");
- log.debug("{}Begin adding offsets {} for consumer group {} to transaction", logPrefix, offsets, consumerGroupId);
+ log.debug("Begin adding offsets {} for consumer group {} to transaction", offsets, consumerGroupId);
AddOffsetsToTxnRequest.Builder builder = new AddOffsetsToTxnRequest.Builder(transactionalId,
producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, consumerGroupId);
AddOffsetsToTxnHandler handler = new AddOffsetsToTxnHandler(builder, offsets);
@@ -235,7 +233,7 @@ public class TransactionManager {
if (isPartitionAdded(topicPartition) || isPartitionPendingAdd(topicPartition))
return;
- log.debug("{}Begin adding new partition {} to transaction", logPrefix, topicPartition);
+ log.debug("Begin adding new partition {} to transaction", topicPartition);
newPartitionsInTransaction.add(topicPartition);
}
@@ -338,8 +336,7 @@ public class TransactionManager {
* Set the producer id and epoch atomically.
*/
void setProducerIdAndEpoch(ProducerIdAndEpoch producerIdAndEpoch) {
- log.info("{}ProducerId set to {} with epoch {}", logPrefix, producerIdAndEpoch.producerId,
- producerIdAndEpoch.epoch);
+ log.info("ProducerId set to {} with epoch {}", producerIdAndEpoch.producerId, producerIdAndEpoch.epoch);
this.producerIdAndEpoch = producerIdAndEpoch;
}
@@ -403,23 +400,23 @@ public class TransactionManager {
pendingRequests.poll();
if (maybeTerminateRequestWithError(nextRequestHandler)) {
- log.trace("{}Not sending transactional request {} because we are in an error state",
- logPrefix, nextRequestHandler.requestBuilder());
+ log.trace("Not sending transactional request {} because we are in an error state",
+ nextRequestHandler.requestBuilder());
return null;
}
if (nextRequestHandler.isEndTxn() && !transactionStarted) {
nextRequestHandler.result.done();
if (currentState != State.FATAL_ERROR) {
- log.debug("{}Not sending EndTxn for completed transaction since no partitions " +
- "or offsets were successfully added", logPrefix);
+ log.debug("Not sending EndTxn for completed transaction since no partitions " +
+ "or offsets were successfully added");
completeTransaction();
}
nextRequestHandler = pendingRequests.poll();
}
if (nextRequestHandler != null)
- log.trace("{}Request {} dequeued for sending", logPrefix, nextRequestHandler.requestBuilder());
+ log.trace("Request {} dequeued for sending", nextRequestHandler.requestBuilder());
return nextRequestHandler;
}
@@ -507,9 +504,9 @@ public class TransactionManager {
}
if (lastError != null)
- log.debug("{}Transition from state {} to error state {}", logPrefix, currentState, target, lastError);
+ log.debug("Transition from state {} to error state {}", currentState, target, lastError);
else
- log.debug("{}Transition from state {} to {}", logPrefix, currentState, target);
+ log.debug("Transition from state {} to {}", currentState, target);
currentState = target;
}
@@ -537,7 +534,7 @@ public class TransactionManager {
}
private void enqueueRequest(TxnRequestHandler requestHandler) {
- log.debug("{}Enqueuing transactional request {}", logPrefix, requestHandler.requestBuilder());
+ log.debug("Enqueuing transactional request {}", requestHandler.requestBuilder());
pendingRequests.add(requestHandler);
}
@@ -634,15 +631,15 @@ public class TransactionManager {
} else {
clearInFlightRequestCorrelationId();
if (response.wasDisconnected()) {
- log.debug("{}Disconnected from {}. Will retry.", logPrefix, response.destination());
+ log.debug("Disconnected from {}. Will retry.", response.destination());
if (this.needsCoordinator())
lookupCoordinator(this.coordinatorType(), this.coordinatorKey());
reenqueue();
} else if (response.versionMismatch() != null) {
fatalError(response.versionMismatch());
} else if (response.hasResponse()) {
- log.trace("{}Received transactional response {} for request {}", logPrefix,
- response.responseBody(), requestBuilder());
+ log.trace("Received transactional response {} for request {}", response.responseBody(),
+ requestBuilder());
synchronized (TransactionManager.this) {
handleResponse(response.responseBody());
}
@@ -781,10 +778,11 @@ public class TransactionManager {
} else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
unauthorizedTopics.add(topicPartition.topic());
} else if (error == Errors.OPERATION_NOT_ATTEMPTED) {
- log.debug("{}Did not attempt to add partition {} to transaction because other partitions in the batch had errors.", logPrefix, topicPartition);
+ log.debug("Did not attempt to add partition {} to transaction because other partitions in the " +
+ "batch had errors.", topicPartition);
hasPartitionErrors = true;
} else {
- log.error("{}Could not add partition {} due to unexpected error {}", logPrefix, topicPartition, error);
+ log.error("Could not add partition {} due to unexpected error {}", topicPartition, error);
hasPartitionErrors = true;
}
}
@@ -803,7 +801,7 @@ public class TransactionManager {
} else if (hasPartitionErrors) {
abortableError(new KafkaException("Could not add partitions to transaction due to errors: " + errors));
} else {
- log.debug("{}Successfully added partitions {} to transaction", logPrefix, partitions);
+ log.debug("Successfully added partitions {} to transaction", partitions);
partitionsInTransaction.addAll(partitions);
transactionStarted = true;
result.done();
@@ -956,8 +954,7 @@ public class TransactionManager {
Errors error = addOffsetsToTxnResponse.error();
if (error == Errors.NONE) {
- log.debug("{}Successfully added partition for consumer group {} to transaction", logPrefix,
- builder.consumerGroupId());
+ log.debug("Successfully added partition for consumer group {} to transaction", builder.consumerGroupId());
// note the result is not completed until the TxnOffsetCommit returns
pendingRequests.add(txnOffsetCommitHandler(result, offsets, builder.consumerGroupId()));
@@ -1019,7 +1016,7 @@ public class TransactionManager {
TopicPartition topicPartition = entry.getKey();
Errors error = entry.getValue();
if (error == Errors.NONE) {
- log.debug("{}Successfully added offsets {} from consumer group {} to transaction.", logPrefix,
+ log.debug("Successfully added offsets {} from consumer group {} to transaction.",
builder.offsets(), builder.consumerGroupId());
pendingTxnOffsetCommits.remove(topicPartition);
} else if (error == Errors.COORDINATOR_NOT_AVAILABLE
http://git-wip-us.apache.org/repos/asf/kafka/blob/2656659e/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 85b5ba6..6f98e52 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -104,7 +104,7 @@ public class SenderTest {
private RecordAccumulator accumulator = null;
private Sender sender = null;
private SenderMetricsRegistry senderMetricsRegistry = null;
- private final LogContext loggerFactory = new LogContext();
+ private final LogContext logContext = new LogContext();
@Before
public void setup() {
@@ -240,7 +240,7 @@ public class SenderTest {
Node node = cluster.nodes().get(0);
NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE,
1000, 1000, 64 * 1024, 64 * 1024, 1000,
- time, true, new ApiVersions(), throttleTimeSensor, new LogContext());
+ time, true, new ApiVersions(), throttleTimeSensor, logContext);
short apiVersionsResponseVersion = ApiKeys.API_VERSIONS.latestVersion();
ByteBuffer buffer = ApiVersionsResponse.createApiVersionsResponse(400, RecordBatch.CURRENT_MAGIC_VALUE).serialize(apiVersionsResponseVersion, new ResponseHeader(0));
@@ -277,7 +277,7 @@ public class SenderTest {
int maxRetries = 1;
Metrics m = new Metrics();
try {
- Sender sender = new Sender(loggerFactory, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
+ Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
maxRetries, m, new SenderMetricsRegistry(), time, REQUEST_TIMEOUT, 50, null, apiVersions);
// do a successful retry
Future<RecordMetadata> future = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
@@ -324,7 +324,7 @@ public class SenderTest {
int maxRetries = 1;
Metrics m = new Metrics();
try {
- Sender sender = new Sender(loggerFactory, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
+ Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
m, new SenderMetricsRegistry(), time, REQUEST_TIMEOUT, 50, null, apiVersions);
// Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1
Cluster cluster1 = TestUtils.clusterWith(2, "test", 2);
@@ -576,7 +576,7 @@ public class SenderTest {
int maxRetries = 10;
Metrics m = new Metrics();
- Sender sender = new Sender(loggerFactory, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
+ Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
m, new SenderMetricsRegistry(), time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
@@ -618,7 +618,7 @@ public class SenderTest {
int maxRetries = 10;
Metrics m = new Metrics();
SenderMetricsRegistry metricsRegistry = new SenderMetricsRegistry();
- Sender sender = new Sender(loggerFactory, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
+ Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
m, metricsRegistry, time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
@@ -655,7 +655,7 @@ public class SenderTest {
int maxRetries = 10;
Metrics m = new Metrics();
- Sender sender = new Sender(loggerFactory, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
+ Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
m, new SenderMetricsRegistry(), time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
@@ -684,7 +684,7 @@ public class SenderTest {
public void testTransactionalSplitBatchAndSend() throws Exception {
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
TopicPartition tp = new TopicPartition("testSplitBatchAndSend", 1);
- TransactionManager txnManager = new TransactionManager("testSplitBatchAndSend", 60000, 100);
+ TransactionManager txnManager = new TransactionManager(logContext, "testSplitBatchAndSend", 60000, 100);
setupWithTransactionState(txnManager);
doInitTransactions(txnManager, producerIdAndEpoch);
@@ -705,10 +705,10 @@ public class SenderTest {
// Set a good compression ratio.
CompressionRatioEstimator.setEstimation(topic, CompressionType.GZIP, 0.2f);
try (Metrics m = new Metrics()) {
- accumulator = new RecordAccumulator(loggerFactory, batchSize, 1024 * 1024, CompressionType.GZIP, 0L, 0L, m, time,
+ accumulator = new RecordAccumulator(logContext, batchSize, 1024 * 1024, CompressionType.GZIP, 0L, 0L, m, time,
new ApiVersions(), txnManager);
SenderMetricsRegistry metricsRegistry = new SenderMetricsRegistry();
- Sender sender = new Sender(loggerFactory, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
+ Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
m, metricsRegistry, time, REQUEST_TIMEOUT, 1000L, txnManager, new ApiVersions());
// Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1
Cluster cluster1 = TestUtils.clusterWith(2, topic, 2);
@@ -826,10 +826,10 @@ public class SenderTest {
metricTags.put("client-id", CLIENT_ID);
MetricConfig metricConfig = new MetricConfig().tags(metricTags);
this.metrics = new Metrics(metricConfig, time);
- this.accumulator = new RecordAccumulator(loggerFactory, batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, metrics, time,
+ this.accumulator = new RecordAccumulator(logContext, batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, metrics, time,
apiVersions, transactionManager);
this.senderMetricsRegistry = new SenderMetricsRegistry(metricTags.keySet());
- this.sender = new Sender(loggerFactory, this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL,
+ this.sender = new Sender(logContext, this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL,
MAX_RETRIES, this.metrics, this.senderMetricsRegistry, this.time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
this.metadata.update(this.cluster, Collections.<String>emptySet(), time.milliseconds());
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2656659e/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 1219b9c..53bba1c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -120,7 +120,8 @@ public class TransactionManagerTest {
int batchSize = 16 * 1024;
MetricConfig metricConfig = new MetricConfig().tags(metricTags);
this.brokerNode = new Node(0, "localhost", 2211);
- this.transactionManager = new TransactionManager(transactionalId, transactionTimeoutMs, DEFAULT_RETRY_BACKOFF_MS);
+ this.transactionManager = new TransactionManager(logContext, transactionalId, transactionTimeoutMs,
+ DEFAULT_RETRY_BACKOFF_MS);
Metrics metrics = new Metrics(metricConfig, time);
this.accumulator = new RecordAccumulator(logContext, batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, metrics, time, apiVersions, transactionManager);
this.sender = new Sender(logContext, this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL,