You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2019/04/29 15:59:29 UTC
[kafka] branch trunk updated: KAFKA-8134: `linger.ms` must be a long
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b4532a6 KAFKA-8134: `linger.ms` must be a long
b4532a6 is described below
commit b4532a65f758448c763b65b2fdde1405db2f9d9d
Author: Dhruvil Shah <dh...@confluent.io>
AuthorDate: Mon Apr 29 08:59:18 2019 -0700
KAFKA-8134: `linger.ms` must be a long
Reviewers: Ismael Juma <is...@juma.me.uk>, Colin P. McCabe <cm...@apache.org>
---
.../kafka/clients/producer/KafkaProducer.java | 13 ++++--
.../kafka/clients/producer/ProducerConfig.java | 2 +-
.../producer/internals/RecordAccumulator.java | 8 ++--
.../producer/internals/RecordAccumulatorTest.java | 51 ++++++++++------------
.../clients/producer/internals/SenderTest.java | 8 ++--
.../producer/internals/TransactionManagerTest.java | 4 +-
.../kafka/api/BaseProducerSendTest.scala | 13 +++---
.../kafka/api/PlaintextProducerSendTest.scala | 6 ++-
.../scala/unit/kafka/server/FetchRequestTest.scala | 1 +
9 files changed, 57 insertions(+), 49 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 83a4d51..b83d98e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -395,7 +395,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
this.accumulator = new RecordAccumulator(logContext,
config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
this.compressionType,
- config.getInt(ProducerConfig.LINGER_MS_CONFIG),
+ lingerMs(config),
retryBackoffMs,
deliveryTimeoutMs,
metrics,
@@ -475,12 +475,17 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
apiVersions);
}
+ private static int lingerMs(ProducerConfig config) {
+ return (int) Math.min(config.getLong(ProducerConfig.LINGER_MS_CONFIG), Integer.MAX_VALUE);
+ }
+
private static int configureDeliveryTimeout(ProducerConfig config, Logger log) {
int deliveryTimeoutMs = config.getInt(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG);
- int lingerMs = config.getInt(ProducerConfig.LINGER_MS_CONFIG);
+ int lingerMs = lingerMs(config);
int requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+ int lingerAndRequestTimeoutMs = (int) Math.min((long) lingerMs + requestTimeoutMs, Integer.MAX_VALUE);
- if (deliveryTimeoutMs < Integer.MAX_VALUE && deliveryTimeoutMs < lingerMs + requestTimeoutMs) {
+ if (deliveryTimeoutMs < Integer.MAX_VALUE && deliveryTimeoutMs < lingerAndRequestTimeoutMs) {
if (config.originals().containsKey(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG)) {
// throw an exception if the user explicitly set an inconsistent value
throw new ConfigException(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG
@@ -488,7 +493,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
+ " + " + ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
} else {
// override deliveryTimeoutMs default value to lingerMs + requestTimeoutMs for backward compatibility
- deliveryTimeoutMs = lingerMs + requestTimeoutMs;
+ deliveryTimeoutMs = lingerAndRequestTimeoutMs;
log.warn("{} should be equal to or larger than {} + {}. Setting it to {}.",
ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, ProducerConfig.LINGER_MS_CONFIG,
ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, deliveryTimeoutMs);
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 9324b9e..758f858 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -260,7 +260,7 @@ public class ProducerConfig extends AbstractConfig {
ACKS_DOC)
.define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", Importance.HIGH, COMPRESSION_TYPE_DOC)
.define(BATCH_SIZE_CONFIG, Type.INT, 16384, atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC)
- .define(LINGER_MS_CONFIG, Type.INT, 0, atLeast(0), Importance.MEDIUM, LINGER_MS_DOC)
+ .define(LINGER_MS_CONFIG, Type.LONG, 0, atLeast(0), Importance.MEDIUM, LINGER_MS_DOC)
.define(DELIVERY_TIMEOUT_MS_CONFIG, Type.INT, 120 * 1000, atLeast(0), Importance.MEDIUM, DELIVERY_TIMEOUT_MS_DOC)
.define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.MEDIUM, CommonClientConfigs.CLIENT_ID_DOC)
.define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(CommonClientConfigs.SEND_BUFFER_LOWER_BOUND), Importance.MEDIUM, CommonClientConfigs.SEND_BUFFER_DOC)
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 80a9d0c..e6b29f3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -72,9 +72,9 @@ public final class RecordAccumulator {
private final AtomicInteger appendsInProgress;
private final int batchSize;
private final CompressionType compression;
- private final long lingerMs;
+ private final int lingerMs;
private final long retryBackoffMs;
- private final long deliveryTimeoutMs;
+ private final int deliveryTimeoutMs;
private final BufferPool free;
private final Time time;
private final ApiVersions apiVersions;
@@ -106,9 +106,9 @@ public final class RecordAccumulator {
public RecordAccumulator(LogContext logContext,
int batchSize,
CompressionType compression,
- long lingerMs,
+ int lingerMs,
long retryBackoffMs,
- long deliveryTimeoutMs,
+ int deliveryTimeoutMs,
Metrics metrics,
String metricGrpName,
Time time,
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index 13b0d1b..5061447 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -103,7 +103,7 @@ public class RecordAccumulatorTest {
int batchSize = 1025;
RecordAccumulator accum = createTestRecordAccumulator(
- batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10L * batchSize, CompressionType.NONE, 10L);
+ batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10L * batchSize, CompressionType.NONE, 10);
int appends = expectedNumAppends(batchSize);
for (int i = 0; i < appends; i++) {
// append to the first batch
@@ -152,7 +152,7 @@ public class RecordAccumulatorTest {
int batchSize = 512;
byte[] value = new byte[2 * batchSize];
RecordAccumulator accum = createTestRecordAccumulator(
- batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, compressionType, 0L);
+ batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, compressionType, 0);
accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs);
assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);
@@ -191,7 +191,7 @@ public class RecordAccumulatorTest {
new ApiVersionsResponse.ApiVersion(ApiKeys.PRODUCE.id, (short) 0, (short) 2))));
RecordAccumulator accum = createTestRecordAccumulator(
- batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, compressionType, 0L);
+ batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, compressionType, 0);
accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs);
assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);
@@ -213,7 +213,7 @@ public class RecordAccumulatorTest {
@Test
public void testLinger() throws Exception {
- long lingerMs = 10L;
+ int lingerMs = 10;
RecordAccumulator accum = createTestRecordAccumulator(
1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, CompressionType.NONE, lingerMs);
accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs);
@@ -234,7 +234,7 @@ public class RecordAccumulatorTest {
@Test
public void testPartialDrain() throws Exception {
RecordAccumulator accum = createTestRecordAccumulator(
- 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, CompressionType.NONE, 10L);
+ 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, CompressionType.NONE, 10);
int appends = 1024 / msgSize + 1;
List<TopicPartition> partitions = asList(tp1, tp2);
for (TopicPartition tp : partitions) {
@@ -254,7 +254,7 @@ public class RecordAccumulatorTest {
final int msgs = 10000;
final int numParts = 2;
final RecordAccumulator accum = createTestRecordAccumulator(
- 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, CompressionType.NONE, 0L);
+ 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, CompressionType.NONE, 0);
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < numThreads; i++) {
threads.add(new Thread() {
@@ -293,7 +293,7 @@ public class RecordAccumulatorTest {
@Test
public void testNextReadyCheckDelay() throws Exception {
// Next check time will use lingerMs since this test won't trigger any retries/backoff
- long lingerMs = 10L;
+ int lingerMs = 10;
// test case assumes that the records do not fill the batch completely
int batchSize = 1025;
@@ -331,10 +331,9 @@ public class RecordAccumulatorTest {
@Test
public void testRetryBackoff() throws Exception {
- long lingerMs = Integer.MAX_VALUE / 16;
+ int lingerMs = Integer.MAX_VALUE / 16;
long retryBackoffMs = Integer.MAX_VALUE / 8;
- int requestTimeoutMs = Integer.MAX_VALUE / 4;
- long deliveryTimeoutMs = Integer.MAX_VALUE;
+ int deliveryTimeoutMs = Integer.MAX_VALUE;
long totalSize = 10 * 1024;
int batchSize = 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD;
String metricGrpName = "producer-metrics";
@@ -377,7 +376,7 @@ public class RecordAccumulatorTest {
@Test
public void testFlush() throws Exception {
- long lingerMs = Integer.MAX_VALUE;
+ int lingerMs = Integer.MAX_VALUE;
final RecordAccumulator accum = createTestRecordAccumulator(
4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024, CompressionType.NONE, lingerMs);
@@ -419,7 +418,7 @@ public class RecordAccumulatorTest {
@Test
public void testAwaitFlushComplete() throws Exception {
RecordAccumulator accum = createTestRecordAccumulator(
- 4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024, CompressionType.NONE, Long.MAX_VALUE);
+ 4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024, CompressionType.NONE, Integer.MAX_VALUE);
accum.append(new TopicPartition(topic, 0), 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs);
accum.beginFlush();
@@ -515,8 +514,8 @@ public class RecordAccumulatorTest {
assertTrue(accum.hasIncomplete());
}
- private void doExpireBatchSingle(long deliveryTimeoutMs) throws InterruptedException {
- long lingerMs = 300L;
+ private void doExpireBatchSingle(int deliveryTimeoutMs) throws InterruptedException {
+ int lingerMs = 300;
List<Boolean> muteStates = Arrays.asList(false, true);
Set<Node> readyNodes = null;
List<ProducerBatch> expiredBatches = new ArrayList<>();
@@ -554,20 +553,20 @@ public class RecordAccumulatorTest {
@Test
public void testExpiredBatchSingle() throws InterruptedException {
- doExpireBatchSingle(3200L);
+ doExpireBatchSingle(3200);
}
@Test
public void testExpiredBatchSingleMaxValue() throws InterruptedException {
- doExpireBatchSingle(Long.MAX_VALUE);
+ doExpireBatchSingle(Integer.MAX_VALUE);
}
@Test
public void testExpiredBatches() throws InterruptedException {
long retryBackoffMs = 100L;
- long lingerMs = 30L;
+ int lingerMs = 30;
int requestTimeout = 60;
- long deliveryTimeoutMs = 3200L;
+ int deliveryTimeoutMs = 3200;
// test case assumes that the records do not fill the batch completely
int batchSize = 1025;
@@ -700,9 +699,8 @@ public class RecordAccumulatorTest {
// Simulate talking to an older broker, ie. one which supports a lower magic.
ApiVersions apiVersions = new ApiVersions();
int batchSize = 1025;
- int requestTimeoutMs = 1600;
- long deliveryTimeoutMs = 3200L;
- long lingerMs = 10L;
+ int deliveryTimeoutMs = 3200;
+ int lingerMs = 10;
long retryBackoffMs = 100L;
long totalSize = 10 * batchSize;
String metricGrpName = "producer-metrics";
@@ -777,7 +775,7 @@ public class RecordAccumulatorTest {
// First set the compression ratio estimation to be good.
CompressionRatioEstimator.setEstimation(tp1.topic(), CompressionType.GZIP, 0.1f);
- RecordAccumulator accum = createTestRecordAccumulator(batchSize, bufferCapacity, CompressionType.GZIP, 0L);
+ RecordAccumulator accum = createTestRecordAccumulator(batchSize, bufferCapacity, CompressionType.GZIP, 0);
int numSplitBatches = prepareSplitBatches(accum, seed, 100, 20);
assertTrue("There should be some split batches", numSplitBatches > 0);
// Drain all the split batches.
@@ -829,7 +827,7 @@ public class RecordAccumulatorTest {
@Test
public void testSoonToExpireBatchesArePickedUpForExpiry() throws InterruptedException {
- long lingerMs = 500L;
+ int lingerMs = 500;
int batchSize = 1025;
RecordAccumulator accum = createTestRecordAccumulator(
@@ -993,17 +991,16 @@ public class RecordAccumulatorTest {
}
- private RecordAccumulator createTestRecordAccumulator(int batchSize, long totalSize, CompressionType type, long lingerMs) {
- long deliveryTimeoutMs = 3200L;
+ private RecordAccumulator createTestRecordAccumulator(int batchSize, long totalSize, CompressionType type, int lingerMs) {
+ int deliveryTimeoutMs = 3200;
return createTestRecordAccumulator(deliveryTimeoutMs, batchSize, totalSize, type, lingerMs);
}
/**
* Return a test RecordAccumulator instance
*/
- private RecordAccumulator createTestRecordAccumulator(long deliveryTimeoutMs, int batchSize, long totalSize, CompressionType type, long lingerMs) {
+ private RecordAccumulator createTestRecordAccumulator(int deliveryTimeoutMs, int batchSize, long totalSize, CompressionType type, int lingerMs) {
long retryBackoffMs = 100L;
- int requestTimeoutMs = 1600;
String metricGrpName = "producer-metrics";
return new RecordAccumulator(
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 6436bcc..430deb6 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -1910,14 +1910,14 @@ public class SenderTest {
TopicPartition tp) throws Exception {
int maxRetries = 1;
String topic = tp.topic();
- long deliveryTimeoutMs = 3000L;
+ int deliveryTimeoutMs = 3000;
long totalSize = 1024 * 1024;
String metricGrpName = "producer-metrics";
// Set a good compression ratio.
CompressionRatioEstimator.setEstimation(topic, CompressionType.GZIP, 0.2f);
try (Metrics m = new Metrics()) {
accumulator = new RecordAccumulator(logContext, batchSize, CompressionType.GZIP,
- 0L, 0L, deliveryTimeoutMs, m, metricGrpName, time, new ApiVersions(), txnManager,
+ 0, 0L, deliveryTimeoutMs, m, metricGrpName, time, new ApiVersions(), txnManager,
new BufferPool(totalSize, batchSize, metrics, time, "producer-internal-metrics"));
SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
@@ -2410,14 +2410,14 @@ public class SenderTest {
}
private void setupWithTransactionState(TransactionManager transactionManager, boolean guaranteeOrder, BufferPool customPool) {
- long deliveryTimeoutMs = 1500L;
+ int deliveryTimeoutMs = 1500;
long totalSize = 1024 * 1024;
String metricGrpName = "producer-metrics";
MetricConfig metricConfig = new MetricConfig().tags(Collections.singletonMap("client-id", CLIENT_ID));
this.metrics = new Metrics(metricConfig, time);
BufferPool pool = (customPool == null) ? new BufferPool(totalSize, batchSize, metrics, time, metricGrpName) : customPool;
- this.accumulator = new RecordAccumulator(logContext, batchSize, CompressionType.NONE, 0L, 0L,
+ this.accumulator = new RecordAccumulator(logContext, batchSize, CompressionType.NONE, 0, 0L,
deliveryTimeoutMs, metrics, metricGrpName, time, apiVersions, transactionManager, pool);
this.senderMetricsRegistry = new SenderMetricsRegistry(this.metrics);
this.sender = new Sender(logContext, this.client, this.metadata, this.accumulator, guaranteeOrder, MAX_REQUEST_SIZE, ACKS_ALL,
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index e830c3b..c11c696 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -118,7 +118,7 @@ public class TransactionManagerTest {
Map<String, String> metricTags = new LinkedHashMap<>();
metricTags.put("client-id", CLIENT_ID);
int batchSize = 16 * 1024;
- long deliveryTimeoutMs = 3000L;
+ int deliveryTimeoutMs = 3000;
long totalSize = 1024 * 1024;
String metricGrpName = "producer-metrics";
MetricConfig metricConfig = new MetricConfig().tags(metricTags);
@@ -128,7 +128,7 @@ public class TransactionManagerTest {
Metrics metrics = new Metrics(metricConfig, time);
SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(metrics);
- this.accumulator = new RecordAccumulator(logContext, batchSize, CompressionType.NONE, 0L, 0L,
+ this.accumulator = new RecordAccumulator(logContext, batchSize, CompressionType.NONE, 0, 0L,
deliveryTimeoutMs, metrics, metricGrpName, time, apiVersions, transactionManager,
new BufferPool(totalSize, batchSize, metrics, time, metricGrpName));
this.sender = new Sender(logContext, this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL,
diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index 2ce16d2..8bb18c9 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -72,6 +72,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
protected def createProducer(brokerList: String,
lingerMs: Int = 0,
+ deliveryTimeoutMs: Int = 2 * 60 * 1000,
batchSize: Int = 16384,
compressionType: String = "none",
maxBlockMs: Long = 60 * 1000L): KafkaProducer[Array[Byte],Array[Byte]] = {
@@ -81,6 +82,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
trustStoreFile = trustStoreFile,
saslProperties = clientSaslProperties,
lingerMs = lingerMs,
+ deliveryTimeoutMs = deliveryTimeoutMs,
maxBlockMs = maxBlockMs)
registerProducer(producer)
}
@@ -172,13 +174,14 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
def testSendCompressedMessageWithCreateTime() {
val producer = createProducer(brokerList = brokerList,
compressionType = "gzip",
- lingerMs = Int.MaxValue)
+ lingerMs = Int.MaxValue,
+ deliveryTimeoutMs = Int.MaxValue)
sendAndVerifyTimestamp(producer, TimestampType.CREATE_TIME)
}
@Test
def testSendNonCompressedMessageWithCreateTime() {
- val producer = createProducer(brokerList = brokerList, lingerMs = Int.MaxValue)
+ val producer = createProducer(brokerList = brokerList, lingerMs = Int.MaxValue, deliveryTimeoutMs = Int.MaxValue)
sendAndVerifyTimestamp(producer, TimestampType.CREATE_TIME)
}
@@ -414,7 +417,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
*/
@Test
def testFlush() {
- val producer = createProducer(brokerList, lingerMs = Int.MaxValue)
+ val producer = createProducer(brokerList, lingerMs = Int.MaxValue, deliveryTimeoutMs = Int.MaxValue)
try {
createTopic(topic, 2, 2)
val record = new ProducerRecord[Array[Byte], Array[Byte]](topic,
@@ -443,7 +446,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
// Test closing from caller thread.
for (_ <- 0 until 50) {
- val producer = createProducer(brokerList, lingerMs = Int.MaxValue)
+ val producer = createProducer(brokerList, lingerMs = Int.MaxValue, deliveryTimeoutMs = Int.MaxValue)
val responses = (0 until numRecords) map (_ => producer.send(record0))
assertTrue("No request is complete.", responses.forall(!_.isDone()))
producer.close(Duration.ZERO)
@@ -483,7 +486,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
}
}
for (i <- 0 until 50) {
- val producer = createProducer(brokerList, lingerMs = Int.MaxValue)
+ val producer = createProducer(brokerList, lingerMs = Int.MaxValue, deliveryTimeoutMs = Int.MaxValue)
try {
// send message to partition 0
// Only send the records in the first callback since we close the producer in the callback and no records
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
index 8ab32af..1bb8989 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
@@ -45,6 +45,7 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
def testBatchSizeZero() {
val producer = createProducer(brokerList = brokerList,
lingerMs = Int.MaxValue,
+ deliveryTimeoutMs = Int.MaxValue,
batchSize = 0)
sendAndVerify(producer)
}
@@ -53,13 +54,14 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
def testSendCompressedMessageWithLogAppendTime() {
val producer = createProducer(brokerList = brokerList,
compressionType = "gzip",
- lingerMs = Int.MaxValue)
+ lingerMs = Int.MaxValue,
+ deliveryTimeoutMs = Int.MaxValue)
sendAndVerifyTimestamp(producer, TimestampType.LOG_APPEND_TIME)
}
@Test
def testSendNonCompressedMessageWithLogAppendTime() {
- val producer = createProducer(brokerList = brokerList, lingerMs = Int.MaxValue)
+ val producer = createProducer(brokerList = brokerList, lingerMs = Int.MaxValue, deliveryTimeoutMs = Int.MaxValue)
sendAndVerifyTimestamp(producer, TimestampType.LOG_APPEND_TIME)
}
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
index 86f3314..8ef611a 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
@@ -257,6 +257,7 @@ class FetchRequestTest extends BaseRequestTest {
val batchSize = 4 * msgValueLen
val producer = TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers),
lingerMs = Int.MaxValue,
+ deliveryTimeoutMs = Int.MaxValue,
batchSize = batchSize,
keySerializer = new StringSerializer,
valueSerializer = new ByteArraySerializer)