You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2019/04/30 16:44:56 UTC

[kafka] branch 2.1 updated: KAFKA-8134: `linger.ms` must be a long

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 18de41a  KAFKA-8134: `linger.ms` must be a long
18de41a is described below

commit 18de41a2907d7f50fb291a7e57c300f5d94c167b
Author: Dhruvil Shah <dh...@confluent.io>
AuthorDate: Mon Apr 29 08:59:18 2019 -0700

    KAFKA-8134: `linger.ms` must be a long
    
    Reviewers: Ismael Juma <is...@juma.me.uk>, Colin P. McCabe <cm...@apache.org>
---
 .../kafka/clients/producer/KafkaProducer.java      | 13 ++++--
 .../kafka/clients/producer/ProducerConfig.java     |  2 +-
 .../producer/internals/RecordAccumulator.java      |  8 ++--
 .../producer/internals/RecordAccumulatorTest.java  | 51 ++++++++++------------
 .../clients/producer/internals/SenderTest.java     | 14 +++---
 .../producer/internals/TransactionManagerTest.java |  4 +-
 .../kafka/api/BaseProducerSendTest.scala           | 13 +++---
 .../kafka/api/PlaintextProducerSendTest.scala      |  6 ++-
 .../scala/unit/kafka/server/FetchRequestTest.scala |  1 +
 9 files changed, 60 insertions(+), 52 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index b2bccb7..8528bba 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -395,7 +395,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             this.accumulator = new RecordAccumulator(logContext,
                     config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
                     this.compressionType,
-                    config.getInt(ProducerConfig.LINGER_MS_CONFIG),
+                    lingerMs(config),
                     retryBackoffMs,
                     deliveryTimeoutMs,
                     metrics,
@@ -472,12 +472,17 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                 apiVersions);
     }
 
+    private static int lingerMs(ProducerConfig config) {
+        return (int) Math.min(config.getLong(ProducerConfig.LINGER_MS_CONFIG), Integer.MAX_VALUE);
+    }
+
     private static int configureDeliveryTimeout(ProducerConfig config, Logger log) {
         int deliveryTimeoutMs = config.getInt(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG);
-        int lingerMs = config.getInt(ProducerConfig.LINGER_MS_CONFIG);
+        int lingerMs = lingerMs(config);
         int requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+        int lingerAndRequestTimeoutMs = (int) Math.min((long) lingerMs + requestTimeoutMs, Integer.MAX_VALUE);
 
-        if (deliveryTimeoutMs < Integer.MAX_VALUE && deliveryTimeoutMs < lingerMs + requestTimeoutMs) {
+        if (deliveryTimeoutMs < Integer.MAX_VALUE && deliveryTimeoutMs < lingerAndRequestTimeoutMs) {
             if (config.originals().containsKey(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG)) {
                 // throw an exception if the user explicitly set an inconsistent value
                 throw new ConfigException(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG
@@ -485,7 +490,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                     + " + " + ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
             } else {
                 // override deliveryTimeoutMs default value to lingerMs + requestTimeoutMs for backward compatibility
-                deliveryTimeoutMs = lingerMs + requestTimeoutMs;
+                deliveryTimeoutMs = lingerAndRequestTimeoutMs;
                 log.warn("{} should be equal to or larger than {} + {}. Setting it to {}.",
                     ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, ProducerConfig.LINGER_MS_CONFIG,
                     ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, deliveryTimeoutMs);
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index c63477d..19f0ce9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -260,7 +260,7 @@ public class ProducerConfig extends AbstractConfig {
                                         ACKS_DOC)
                                 .define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", Importance.HIGH, COMPRESSION_TYPE_DOC)
                                 .define(BATCH_SIZE_CONFIG, Type.INT, 16384, atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC)
-                                .define(LINGER_MS_CONFIG, Type.INT, 0, atLeast(0), Importance.MEDIUM, LINGER_MS_DOC)
+                                .define(LINGER_MS_CONFIG, Type.LONG, 0, atLeast(0), Importance.MEDIUM, LINGER_MS_DOC)
                                 .define(DELIVERY_TIMEOUT_MS_CONFIG, Type.INT, 120 * 1000, atLeast(0), Importance.MEDIUM, DELIVERY_TIMEOUT_MS_DOC)
                                 .define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.MEDIUM, CommonClientConfigs.CLIENT_ID_DOC)
                                 .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(CommonClientConfigs.SEND_BUFFER_LOWER_BOUND), Importance.MEDIUM, CommonClientConfigs.SEND_BUFFER_DOC)
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 80a9d0c..e6b29f3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -72,9 +72,9 @@ public final class RecordAccumulator {
     private final AtomicInteger appendsInProgress;
     private final int batchSize;
     private final CompressionType compression;
-    private final long lingerMs;
+    private final int lingerMs;
     private final long retryBackoffMs;
-    private final long deliveryTimeoutMs;
+    private final int deliveryTimeoutMs;
     private final BufferPool free;
     private final Time time;
     private final ApiVersions apiVersions;
@@ -106,9 +106,9 @@ public final class RecordAccumulator {
     public RecordAccumulator(LogContext logContext,
                              int batchSize,
                              CompressionType compression,
-                             long lingerMs,
+                             int lingerMs,
                              long retryBackoffMs,
-                             long deliveryTimeoutMs,
+                             int deliveryTimeoutMs,
                              Metrics metrics,
                              String metricGrpName,
                              Time time,
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index 13b0d1b..5061447 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -103,7 +103,7 @@ public class RecordAccumulatorTest {
         int batchSize = 1025;
 
         RecordAccumulator accum = createTestRecordAccumulator(
-                batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10L * batchSize, CompressionType.NONE, 10L);
+                batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10L * batchSize, CompressionType.NONE, 10);
         int appends = expectedNumAppends(batchSize);
         for (int i = 0; i < appends; i++) {
             // append to the first batch
@@ -152,7 +152,7 @@ public class RecordAccumulatorTest {
         int batchSize = 512;
         byte[] value = new byte[2 * batchSize];
         RecordAccumulator accum = createTestRecordAccumulator(
-                batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, compressionType, 0L);
+                batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, compressionType, 0);
         accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs);
         assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);
 
@@ -191,7 +191,7 @@ public class RecordAccumulatorTest {
                 new ApiVersionsResponse.ApiVersion(ApiKeys.PRODUCE.id, (short) 0, (short) 2))));
 
         RecordAccumulator accum = createTestRecordAccumulator(
-                batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, compressionType, 0L);
+                batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, compressionType, 0);
         accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs);
         assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);
 
@@ -213,7 +213,7 @@ public class RecordAccumulatorTest {
 
     @Test
     public void testLinger() throws Exception {
-        long lingerMs = 10L;
+        int lingerMs = 10;
         RecordAccumulator accum = createTestRecordAccumulator(
                 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, CompressionType.NONE, lingerMs);
         accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs);
@@ -234,7 +234,7 @@ public class RecordAccumulatorTest {
     @Test
     public void testPartialDrain() throws Exception {
         RecordAccumulator accum = createTestRecordAccumulator(
-                1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, CompressionType.NONE, 10L);
+                1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, CompressionType.NONE, 10);
         int appends = 1024 / msgSize + 1;
         List<TopicPartition> partitions = asList(tp1, tp2);
         for (TopicPartition tp : partitions) {
@@ -254,7 +254,7 @@ public class RecordAccumulatorTest {
         final int msgs = 10000;
         final int numParts = 2;
         final RecordAccumulator accum = createTestRecordAccumulator(
-            1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, CompressionType.NONE, 0L);
+            1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, CompressionType.NONE, 0);
         List<Thread> threads = new ArrayList<>();
         for (int i = 0; i < numThreads; i++) {
             threads.add(new Thread() {
@@ -293,7 +293,7 @@ public class RecordAccumulatorTest {
     @Test
     public void testNextReadyCheckDelay() throws Exception {
         // Next check time will use lingerMs since this test won't trigger any retries/backoff
-        long lingerMs = 10L;
+        int lingerMs = 10;
 
         // test case assumes that the records do not fill the batch completely
         int batchSize = 1025;
@@ -331,10 +331,9 @@ public class RecordAccumulatorTest {
 
     @Test
     public void testRetryBackoff() throws Exception {
-        long lingerMs = Integer.MAX_VALUE / 16;
+        int lingerMs = Integer.MAX_VALUE / 16;
         long retryBackoffMs = Integer.MAX_VALUE / 8;
-        int requestTimeoutMs = Integer.MAX_VALUE / 4;
-        long deliveryTimeoutMs = Integer.MAX_VALUE;
+        int deliveryTimeoutMs = Integer.MAX_VALUE;
         long totalSize = 10 * 1024;
         int batchSize = 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD;
         String metricGrpName = "producer-metrics";
@@ -377,7 +376,7 @@ public class RecordAccumulatorTest {
 
     @Test
     public void testFlush() throws Exception {
-        long lingerMs = Integer.MAX_VALUE;
+        int lingerMs = Integer.MAX_VALUE;
         final RecordAccumulator accum = createTestRecordAccumulator(
                 4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024, CompressionType.NONE, lingerMs);
 
@@ -419,7 +418,7 @@ public class RecordAccumulatorTest {
     @Test
     public void testAwaitFlushComplete() throws Exception {
         RecordAccumulator accum = createTestRecordAccumulator(
-            4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024, CompressionType.NONE, Long.MAX_VALUE);
+            4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024, CompressionType.NONE, Integer.MAX_VALUE);
         accum.append(new TopicPartition(topic, 0), 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs);
 
         accum.beginFlush();
@@ -515,8 +514,8 @@ public class RecordAccumulatorTest {
         assertTrue(accum.hasIncomplete());
     }
 
-    private void doExpireBatchSingle(long deliveryTimeoutMs) throws InterruptedException {
-        long lingerMs = 300L;
+    private void doExpireBatchSingle(int deliveryTimeoutMs) throws InterruptedException {
+        int lingerMs = 300;
         List<Boolean> muteStates = Arrays.asList(false, true);
         Set<Node> readyNodes = null;
         List<ProducerBatch> expiredBatches = new ArrayList<>();
@@ -554,20 +553,20 @@ public class RecordAccumulatorTest {
 
     @Test
     public void testExpiredBatchSingle() throws InterruptedException {
-        doExpireBatchSingle(3200L);
+        doExpireBatchSingle(3200);
     }
 
     @Test
     public void testExpiredBatchSingleMaxValue() throws InterruptedException {
-        doExpireBatchSingle(Long.MAX_VALUE);
+        doExpireBatchSingle(Integer.MAX_VALUE);
     }
 
     @Test
     public void testExpiredBatches() throws InterruptedException {
         long retryBackoffMs = 100L;
-        long lingerMs = 30L;
+        int lingerMs = 30;
         int requestTimeout = 60;
-        long deliveryTimeoutMs = 3200L;
+        int deliveryTimeoutMs = 3200;
 
         // test case assumes that the records do not fill the batch completely
         int batchSize = 1025;
@@ -700,9 +699,8 @@ public class RecordAccumulatorTest {
         // Simulate talking to an older broker, ie. one which supports a lower magic.
         ApiVersions apiVersions = new ApiVersions();
         int batchSize = 1025;
-        int requestTimeoutMs = 1600;
-        long deliveryTimeoutMs = 3200L;
-        long lingerMs = 10L;
+        int deliveryTimeoutMs = 3200;
+        int lingerMs = 10;
         long retryBackoffMs = 100L;
         long totalSize = 10 * batchSize;
         String metricGrpName = "producer-metrics";
@@ -777,7 +775,7 @@ public class RecordAccumulatorTest {
 
         // First set the compression ratio estimation to be good.
         CompressionRatioEstimator.setEstimation(tp1.topic(), CompressionType.GZIP, 0.1f);
-        RecordAccumulator accum = createTestRecordAccumulator(batchSize, bufferCapacity, CompressionType.GZIP, 0L);
+        RecordAccumulator accum = createTestRecordAccumulator(batchSize, bufferCapacity, CompressionType.GZIP, 0);
         int numSplitBatches = prepareSplitBatches(accum, seed, 100, 20);
         assertTrue("There should be some split batches", numSplitBatches > 0);
         // Drain all the split batches.
@@ -829,7 +827,7 @@ public class RecordAccumulatorTest {
 
     @Test
     public void testSoonToExpireBatchesArePickedUpForExpiry() throws InterruptedException {
-        long lingerMs = 500L;
+        int lingerMs = 500;
         int batchSize = 1025;
 
         RecordAccumulator accum = createTestRecordAccumulator(
@@ -993,17 +991,16 @@ public class RecordAccumulatorTest {
     }
 
 
-    private RecordAccumulator createTestRecordAccumulator(int batchSize, long totalSize, CompressionType type, long lingerMs) {
-        long deliveryTimeoutMs = 3200L;
+    private RecordAccumulator createTestRecordAccumulator(int batchSize, long totalSize, CompressionType type, int lingerMs) {
+        int deliveryTimeoutMs = 3200;
         return createTestRecordAccumulator(deliveryTimeoutMs, batchSize, totalSize, type, lingerMs);
     }
 
     /**
      * Return a test RecordAccumulator instance
      */
-    private RecordAccumulator createTestRecordAccumulator(long deliveryTimeoutMs, int batchSize, long totalSize, CompressionType type, long lingerMs) {
+    private RecordAccumulator createTestRecordAccumulator(int deliveryTimeoutMs, int batchSize, long totalSize, CompressionType type, int lingerMs) {
         long retryBackoffMs = 100L;
-        int requestTimeoutMs = 1600;
         String metricGrpName = "producer-metrics";
 
         return new RecordAccumulator(
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index ffc0763..77c6ac5 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -1915,14 +1915,14 @@ public class SenderTest {
                                        TopicPartition tp) throws Exception {
         int maxRetries = 1;
         String topic = tp.topic();
-        long deliveryTimeoutMs = 3000L;
+        int deliveryTimeoutMs = 3000;
         long totalSize = 1024 * 1024;
         String metricGrpName = "producer-metrics";
         // Set a good compression ratio.
         CompressionRatioEstimator.setEstimation(topic, CompressionType.GZIP, 0.2f);
         try (Metrics m = new Metrics()) {
             accumulator = new RecordAccumulator(logContext, batchSize, CompressionType.GZIP,
-                0L, 0L, deliveryTimeoutMs, m, metricGrpName, time, new ApiVersions(), txnManager,
+                0, 0L, deliveryTimeoutMs, m, metricGrpName, time, new ApiVersions(), txnManager,
                 new BufferPool(totalSize, batchSize, metrics, time, "producer-internal-metrics"));
             SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
             Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
@@ -2025,7 +2025,7 @@ public class SenderTest {
 
     @Test
     public void testInflightBatchesExpireOnDeliveryTimeout() throws InterruptedException {
-        long deliveryTimeoutMs = 1500L;
+        int deliveryTimeoutMs = 1500;
         setupWithTransactionState(null, true, null);
 
         // Send first ProduceRequest
@@ -2051,7 +2051,7 @@ public class SenderTest {
 
     @Test
     public void testWhenFirstBatchExpireNoSendSecondBatchIfGuaranteeOrder() throws InterruptedException {
-        long deliveryTimeoutMs = 1500L;
+        int deliveryTimeoutMs = 1500;
         setupWithTransactionState(null, true, null);
 
         // Send first ProduceRequest
@@ -2168,7 +2168,7 @@ public class SenderTest {
 
     @Test
     public void testExpiredBatchesInMultiplePartitions() throws Exception {
-        long deliveryTimeoutMs = 1500L;
+        int deliveryTimeoutMs = 1500;
         setupWithTransactionState(null, true, null);
 
         // Send multiple ProduceRequest across multiple partitions.
@@ -2308,9 +2308,9 @@ public class SenderTest {
     }
 
     private void setupWithTransactionState(TransactionManager transactionManager, boolean guaranteeOrder, Map<String, String> metricTags, BufferPool pool) {
-        long deliveryTimeoutMs = 1500L;
+        int deliveryTimeoutMs = 1500;
         String metricGrpName = "producer-metrics";
-        this.accumulator = new RecordAccumulator(logContext, batchSize, CompressionType.NONE, 0L, 0L,
+        this.accumulator = new RecordAccumulator(logContext, batchSize, CompressionType.NONE, 0, 0,
             deliveryTimeoutMs, metrics, metricGrpName, time, apiVersions, transactionManager, pool);
         this.senderMetricsRegistry = new SenderMetricsRegistry(this.metrics);
         this.sender = new Sender(logContext, this.client, this.metadata, this.accumulator, guaranteeOrder, MAX_REQUEST_SIZE, ACKS_ALL,
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 826bd46..8845333 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -118,7 +118,7 @@ public class TransactionManagerTest {
         Map<String, String> metricTags = new LinkedHashMap<>();
         metricTags.put("client-id", CLIENT_ID);
         int batchSize = 16 * 1024;
-        long deliveryTimeoutMs = 3000L;
+        int deliveryTimeoutMs = 3000;
         long totalSize = 1024 * 1024;
         String metricGrpName = "producer-metrics";
         MetricConfig metricConfig = new MetricConfig().tags(metricTags);
@@ -128,7 +128,7 @@ public class TransactionManagerTest {
         Metrics metrics = new Metrics(metricConfig, time);
         SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(metrics);
 
-        this.accumulator = new RecordAccumulator(logContext, batchSize, CompressionType.NONE, 0L, 0L,
+        this.accumulator = new RecordAccumulator(logContext, batchSize, CompressionType.NONE, 0, 0L,
                 deliveryTimeoutMs, metrics, metricGrpName, time, apiVersions, transactionManager,
                 new BufferPool(totalSize, batchSize, metrics, time, metricGrpName));
         this.sender = new Sender(logContext, this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL,
diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index 8799bac..9250b07 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -71,6 +71,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
 
   protected def createProducer(brokerList: String,
                                lingerMs: Int = 0,
+                               deliveryTimeoutMs: Int = 2 * 60 * 1000,
                                batchSize: Int = 16384,
                                compressionType: String = "none",
                                maxBlockMs: Long = 60 * 1000L): KafkaProducer[Array[Byte],Array[Byte]] = {
@@ -80,6 +81,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
       trustStoreFile = trustStoreFile,
       saslProperties = clientSaslProperties,
       lingerMs = lingerMs,
+      deliveryTimeoutMs = deliveryTimeoutMs,
       maxBlockMs = maxBlockMs)
     registerProducer(producer)
   }
@@ -180,13 +182,14 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
   def testSendCompressedMessageWithCreateTime() {
     val producer = createProducer(brokerList = brokerList,
       compressionType = "gzip",
-      lingerMs = Int.MaxValue)
+      lingerMs = Int.MaxValue,
+      deliveryTimeoutMs = Int.MaxValue)
     sendAndVerifyTimestamp(producer, TimestampType.CREATE_TIME)
   }
 
   @Test
   def testSendNonCompressedMessageWithCreateTime() {
-    val producer = createProducer(brokerList = brokerList, lingerMs = Int.MaxValue)
+    val producer = createProducer(brokerList = brokerList, lingerMs = Int.MaxValue, deliveryTimeoutMs = Int.MaxValue)
     sendAndVerifyTimestamp(producer, TimestampType.CREATE_TIME)
   }
 
@@ -422,7 +425,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
    */
   @Test
   def testFlush() {
-    val producer = createProducer(brokerList, lingerMs = Int.MaxValue)
+    val producer = createProducer(brokerList, lingerMs = Int.MaxValue, deliveryTimeoutMs = Int.MaxValue)
     try {
       createTopic(topic, 2, 2)
       val record = new ProducerRecord[Array[Byte], Array[Byte]](topic,
@@ -451,7 +454,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
 
     // Test closing from caller thread.
     for (_ <- 0 until 50) {
-      val producer = createProducer(brokerList, lingerMs = Int.MaxValue)
+      val producer = createProducer(brokerList, lingerMs = Int.MaxValue, deliveryTimeoutMs = Int.MaxValue)
       val responses = (0 until numRecords) map (_ => producer.send(record0))
       assertTrue("No request is complete.", responses.forall(!_.isDone()))
       producer.close(0, TimeUnit.MILLISECONDS)
@@ -491,7 +494,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
       }
     }
     for (i <- 0 until 50) {
-      val producer = createProducer(brokerList, lingerMs = Int.MaxValue)
+      val producer = createProducer(brokerList, lingerMs = Int.MaxValue, deliveryTimeoutMs = Int.MaxValue)
       try {
         // send message to partition 0
         // Only send the records in the first callback since we close the producer in the callback and no records
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
index d2ff2a8..321a0c0 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
@@ -45,6 +45,7 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
   def testBatchSizeZero() {
     val producer = createProducer(brokerList = brokerList,
       lingerMs = Int.MaxValue,
+      deliveryTimeoutMs = Int.MaxValue,
       batchSize = 0)
     sendAndVerify(producer)
   }
@@ -53,13 +54,14 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
   def testSendCompressedMessageWithLogAppendTime() {
     val producer = createProducer(brokerList = brokerList,
       compressionType = "gzip",
-      lingerMs = Int.MaxValue)
+      lingerMs = Int.MaxValue,
+      deliveryTimeoutMs = Int.MaxValue)
     sendAndVerifyTimestamp(producer, TimestampType.LOG_APPEND_TIME)
   }
 
   @Test
   def testSendNonCompressedMessageWithLogAppendTime() {
-    val producer = createProducer(brokerList = brokerList, lingerMs = Int.MaxValue)
+    val producer = createProducer(brokerList = brokerList, lingerMs = Int.MaxValue, deliveryTimeoutMs = Int.MaxValue)
     sendAndVerifyTimestamp(producer, TimestampType.LOG_APPEND_TIME)
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
index 86f3314..8ef611a 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
@@ -257,6 +257,7 @@ class FetchRequestTest extends BaseRequestTest {
     val batchSize = 4 * msgValueLen
     val producer = TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers),
       lingerMs = Int.MaxValue,
+      deliveryTimeoutMs = Int.MaxValue,
       batchSize = batchSize,
       keySerializer = new StringSerializer,
       valueSerializer = new ByteArraySerializer)