You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/09/25 23:13:15 UTC

kafka git commit: KAFKA-5960; Fix regression in produce version selection on old brokers

Repository: kafka
Updated Branches:
  refs/heads/trunk a96e28eac -> 852297efd


KAFKA-5960; Fix regression in produce version selection on old brokers

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Apurva Mehta <ap...@confluent.io>, Ismael Juma <is...@juma.me.uk>

Closes #3944 from hachikuji/KAFKA-5960


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/852297ef
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/852297ef
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/852297ef

Branch: refs/heads/trunk
Commit: 852297efd99af04df28710b1b5c99530ab20a072
Parents: a96e28e
Author: Jason Gustafson <ja...@confluent.io>
Authored: Tue Sep 26 00:13:02 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Tue Sep 26 00:13:02 2017 +0100

----------------------------------------------------------------------
 .../org/apache/kafka/clients/NetworkClient.java |   7 +-
 .../apache/kafka/clients/NodeApiVersions.java   |  38 ++---
 .../internals/ConsumerNetworkClient.java        |   2 +-
 .../clients/producer/internals/Sender.java      |   2 +-
 .../kafka/common/requests/AbstractRequest.java  |  24 +--
 .../requests/ControlledShutdownRequest.java     |   6 +-
 .../kafka/common/requests/FetchRequest.java     |  15 +-
 .../common/requests/ListOffsetRequest.java      |  16 +-
 .../kafka/common/requests/ProduceRequest.java   |  54 ++++---
 .../org/apache/kafka/clients/MockClient.java    |  11 +-
 .../apache/kafka/clients/NetworkClientTest.java |   7 +-
 .../kafka/clients/NodeApiVersionsTest.java      |  24 ++-
 .../clients/producer/internals/SenderTest.java  |   2 +-
 .../common/requests/ProduceRequestTest.java     | 153 +++++++++++++++----
 .../common/requests/RequestResponseTest.java    |  64 +-------
 .../kafka/common/InterBrokerSendThread.scala    |   9 +-
 .../kafka/api/AuthorizerIntegrationTest.scala   |   5 +-
 .../unit/kafka/network/SocketServerTest.scala   |   4 +-
 .../unit/kafka/server/EdgeCaseRequestTest.scala |   3 +-
 .../unit/kafka/server/ProduceRequestTest.scala  |   4 +-
 .../unit/kafka/server/RequestQuotaTest.scala    |   4 +-
 21 files changed, 243 insertions(+), 211 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/852297ef/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index f8da42c..0fbaff7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -384,12 +384,13 @@ public class NetworkClient implements KafkaClient {
             // the case when sending the initial ApiVersionRequest which fetches the version
             // information itself.  It is also the case when discoverBrokerVersions is set to false.
             if (versionInfo == null) {
-                version = builder.desiredOrLatestVersion();
+                version = builder.latestAllowedVersion();
                 if (discoverBrokerVersions && log.isTraceEnabled())
                     log.trace("No version information found when sending {} with correlation id {} to node {}. " +
                             "Assuming version {}.", clientRequest.apiKey(), clientRequest.correlationId(), nodeId, version);
             } else {
-                version = versionInfo.usableVersion(clientRequest.apiKey(), builder.desiredVersion());
+                version = versionInfo.usableVersion(clientRequest.apiKey(), builder.oldestAllowedVersion(),
+                        builder.latestAllowedVersion());
             }
             // The call to build may also throw UnsupportedVersionException, if there are essential
             // fields that cannot be represented in the chosen version.
@@ -399,7 +400,7 @@ public class NetworkClient implements KafkaClient {
             // Instead, simply add it to the local queue of aborted requests.
             log.debug("Version mismatch when attempting to send {} with correlation id {} to {}", builder,
                     clientRequest.correlationId(), clientRequest.destination(), e);
-            ClientResponse clientResponse = new ClientResponse(clientRequest.makeHeader(builder.desiredOrLatestVersion()),
+            ClientResponse clientResponse = new ClientResponse(clientRequest.makeHeader(builder.latestAllowedVersion()),
                     clientRequest.callback(), clientRequest.destination(), now, now,
                     false, e, null);
             abortedSends.add(clientResponse);

http://git-wip-us.apache.org/repos/asf/kafka/blob/852297ef/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java b/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
index dc2e6d1..93a5c72 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
@@ -89,24 +89,17 @@ public class NodeApiVersions {
      * Return the most recent version supported by both the node and the local software.
      */
     public short usableVersion(ApiKeys apiKey) {
-        return usableVersion(apiKey, null);
+        return usableVersion(apiKey, apiKey.oldestVersion(), apiKey.latestVersion());
     }
 
     /**
-     * Return the desired version (if usable) or the latest usable version if the desired version is null.
+     * Get the latest version supported by the broker within an allowed range of versions
      */
-    public short usableVersion(ApiKeys apiKey, Short desiredVersion) {
+    public short usableVersion(ApiKeys apiKey, short oldestAllowedVersion, short latestAllowedVersion) {
         UsableVersion usableVersion = usableVersions.get(apiKey);
         if (usableVersion == null)
             throw new UnsupportedVersionException("The broker does not support " + apiKey);
-
-        if (desiredVersion == null) {
-            usableVersion.ensureUsable();
-            return usableVersion.value;
-        } else {
-            usableVersion.ensureUsable(desiredVersion);
-            return desiredVersion;
-        }
+        return usableVersion.latestSupportedVersion(oldestAllowedVersion, latestAllowedVersion);
     }
 
     /**
@@ -226,22 +219,15 @@ public class NodeApiVersions {
             return value == NODE_TOO_NEW;
         }
 
-        private void ensureUsable() {
-            if (value == NODE_TOO_OLD)
-                throw new UnsupportedVersionException("The broker is too old to support " + apiKey +
-                        " version " + apiKey.oldestVersion());
-            else if (value == NODE_TOO_NEW)
-                throw new UnsupportedVersionException("The broker is too new to support " + apiKey +
-                        " version " + apiKey.latestVersion());
+        private short latestSupportedVersion(short minAllowedVersion, short maxAllowedVersion) {
+            short minVersion = (short) Math.max(minAllowedVersion, apiVersion.minVersion);
+            short maxVersion = (short) Math.min(maxAllowedVersion, apiVersion.maxVersion);
+            if (minVersion > maxVersion)
+                throw new UnsupportedVersionException("The broker does not support " + apiKey +
+                        " with version in range [" + minAllowedVersion + "," + maxAllowedVersion + "]. The supported" +
+                        " range is [" + apiVersion.minVersion + "," + apiVersion.maxVersion + "].");
+            return maxVersion;
         }
-
-        private void ensureUsable(short desiredVersion) {
-            if (apiVersion.minVersion > desiredVersion || apiVersion.maxVersion < desiredVersion)
-                throw new UnsupportedVersionException("The broker does not support the requested version " + desiredVersion +
-                        " for api " + apiKey + ". Supported versions are " + apiVersion.minVersion +
-                        " to " + apiVersion.maxVersion + ".");
-        }
-
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/852297ef/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index 86fca9e..2f9cb1b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -375,7 +375,7 @@ public class ConsumerNetworkClient implements Closeable {
                     if (authenticationException != null)
                         handler.onFailure(authenticationException);
                     else
-                        handler.onComplete(new ClientResponse(request.makeHeader(request.requestBuilder().desiredOrLatestVersion()),
+                        handler.onComplete(new ClientResponse(request.makeHeader(request.requestBuilder().latestAllowedVersion()),
                             request.callback(), request.destination(), request.createdTimeMs(), now, true,
                             null, null));
                 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/852297ef/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 45a2919..1aadf3d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -686,7 +686,7 @@ public class Sender implements Runnable {
         if (transactionManager != null && transactionManager.isTransactional()) {
             transactionalId = transactionManager.transactionalId();
         }
-        ProduceRequest.Builder requestBuilder = new ProduceRequest.Builder(minUsedMagic, acks, timeout,
+        ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(minUsedMagic, acks, timeout,
                 produceRecordsByPartition, transactionalId);
         RequestCompletionHandler callback = new RequestCompletionHandler() {
             public void onComplete(ClientResponse response) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/852297ef/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index e093f77..bbf13d6 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -27,31 +27,37 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
 
     public static abstract class Builder<T extends AbstractRequest> {
         private final ApiKeys apiKey;
-        private final Short desiredVersion;
+        private final short oldestAllowedVersion;
+        private final short latestAllowedVersion;
 
         public Builder(ApiKeys apiKey) {
-            this(apiKey, null);
+            this(apiKey, apiKey.oldestVersion(), apiKey.latestVersion());
         }
 
-        public Builder(ApiKeys apiKey, Short desiredVersion) {
+        public Builder(ApiKeys apiKey, short desiredVersion) {
+            this(apiKey, desiredVersion, desiredVersion);
+        }
+
+        public Builder(ApiKeys apiKey, short oldestAllowedVersion, short latestAllowedVersion) {
             this.apiKey = apiKey;
-            this.desiredVersion = desiredVersion;
+            this.oldestAllowedVersion = oldestAllowedVersion;
+            this.latestAllowedVersion = latestAllowedVersion;
         }
 
         public ApiKeys apiKey() {
             return apiKey;
         }
 
-        public short desiredOrLatestVersion() {
-            return desiredVersion == null ? apiKey.latestVersion() : desiredVersion;
+        public short oldestAllowedVersion() {
+            return oldestAllowedVersion;
         }
 
-        public Short desiredVersion() {
-            return desiredVersion;
+        public short latestAllowedVersion() {
+            return latestAllowedVersion;
         }
 
         public T build() {
-            return build(desiredOrLatestVersion());
+            return build(latestAllowedVersion());
         }
 
         public abstract T build(short version);

http://git-wip-us.apache.org/repos/asf/kafka/blob/852297ef/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
index c77bd13..e6e8734 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
@@ -42,11 +42,7 @@ public class ControlledShutdownRequest extends AbstractRequest {
     public static class Builder extends AbstractRequest.Builder<ControlledShutdownRequest> {
         private final int brokerId;
 
-        public Builder(int brokerId) {
-            this(brokerId, null);
-        }
-
-        public Builder(int brokerId, Short desiredVersion) {
+        public Builder(int brokerId, short desiredVersion) {
             super(ApiKeys.CONTROLLED_SHUTDOWN, desiredVersion);
             this.brokerId = brokerId;
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/852297ef/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index 3fea26c..1a9e553 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -204,21 +204,24 @@ public class FetchRequest extends AbstractRequest {
         private int maxBytes = DEFAULT_RESPONSE_MAX_BYTES;
 
         public static Builder forConsumer(int maxWait, int minBytes, LinkedHashMap<TopicPartition, PartitionData> fetchData) {
-            return new Builder(null, CONSUMER_REPLICA_ID, maxWait, minBytes, fetchData, IsolationLevel.READ_UNCOMMITTED);
+            return forConsumer(maxWait, minBytes, fetchData, IsolationLevel.READ_UNCOMMITTED);
         }
 
-        public static Builder forConsumer(int maxWait, int minBytes, LinkedHashMap<TopicPartition, PartitionData> fetchData, IsolationLevel isolationLevel) {
-            return new Builder(null, CONSUMER_REPLICA_ID, maxWait, minBytes, fetchData, isolationLevel);
+        public static Builder forConsumer(int maxWait, int minBytes, LinkedHashMap<TopicPartition, PartitionData> fetchData,
+                                          IsolationLevel isolationLevel) {
+            return new Builder(ApiKeys.FETCH.oldestVersion(), ApiKeys.FETCH.latestVersion(), CONSUMER_REPLICA_ID,
+                    maxWait, minBytes, fetchData, isolationLevel);
         }
 
         public static Builder forReplica(short desiredVersion, int replicaId, int maxWait, int minBytes,
                                          LinkedHashMap<TopicPartition, PartitionData> fetchData) {
-            return new Builder(desiredVersion, replicaId, maxWait, minBytes, fetchData, IsolationLevel.READ_UNCOMMITTED);
+            return new Builder(desiredVersion, desiredVersion, replicaId, maxWait, minBytes, fetchData,
+                    IsolationLevel.READ_UNCOMMITTED);
         }
 
-        private Builder(Short desiredVersion, int replicaId, int maxWait, int minBytes,
+        private Builder(short minVersion, short maxVersion, int replicaId, int maxWait, int minBytes,
                         LinkedHashMap<TopicPartition, PartitionData> fetchData, IsolationLevel isolationLevel) {
-            super(ApiKeys.FETCH, desiredVersion);
+            super(ApiKeys.FETCH, minVersion, maxVersion);
             this.replicaId = replicaId;
             this.maxWait = maxWait;
             this.minBytes = minBytes;

http://git-wip-us.apache.org/repos/asf/kafka/blob/852297ef/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
index ace582d..e252d3d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.ArrayOf;
@@ -104,7 +103,6 @@ public class ListOffsetRequest extends AbstractRequest {
 
     public static class Builder extends AbstractRequest.Builder<ListOffsetRequest> {
         private final int replicaId;
-        private final short minVersion;
         private final IsolationLevel isolationLevel;
         private Map<TopicPartition, PartitionData> offsetData = null;
         private Map<TopicPartition, Long> partitionTimestamps = null;
@@ -114,18 +112,16 @@ public class ListOffsetRequest extends AbstractRequest {
         }
 
         public static Builder forConsumer(boolean requireTimestamp, IsolationLevel isolationLevel) {
-            // If we need a timestamp in the response, the minimum RPC version we can send is v1. Otherwise, v0 is OK.
             short minVersion = 0;
             if (isolationLevel == IsolationLevel.READ_COMMITTED)
                 minVersion = 2;
             else if (requireTimestamp)
                 minVersion = 1;
-            return new Builder(minVersion, null, CONSUMER_REPLICA_ID, isolationLevel);
+            return new Builder(minVersion, ApiKeys.LIST_OFFSETS.latestVersion(), CONSUMER_REPLICA_ID, isolationLevel);
         }
 
-        private Builder(short minVersion, Short desiredVersion, int replicaId, IsolationLevel isolationLevel) {
-            super(ApiKeys.LIST_OFFSETS, desiredVersion);
-            this.minVersion = minVersion;
+        private Builder(short minVersion, short maxVersion, int replicaId, IsolationLevel isolationLevel) {
+            super(ApiKeys.LIST_OFFSETS, minVersion, maxVersion);
             this.replicaId = replicaId;
             this.isolationLevel = isolationLevel;
         }
@@ -142,10 +138,6 @@ public class ListOffsetRequest extends AbstractRequest {
 
         @Override
         public ListOffsetRequest build(short version) {
-            if (version < minVersion) {
-                throw new UnsupportedVersionException("Cannot create a v" + version + " ListOffsetRequest because " +
-                    "we require features supported only in " + minVersion + " or later.");
-            }
             if (version == 0) {
                 if (offsetData == null) {
                     if (partitionTimestamps == null) {
@@ -184,7 +176,7 @@ public class ListOffsetRequest extends AbstractRequest {
             if (partitionTimestamps != null) {
                 bld.append(", partitionTimestamps=").append(partitionTimestamps);
             }
-            bld.append(", minVersion=").append(minVersion);
+            bld.append(", isolationLevel=").append(isolationLevel);
             bld.append(")");
             return bld.toString();
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/852297ef/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
index 8ab0b20..fbc7f76 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.ArrayOf;
@@ -116,37 +115,53 @@ public class ProduceRequest extends AbstractRequest {
     }
 
     public static class Builder extends AbstractRequest.Builder<ProduceRequest> {
-        private final byte magic;
         private final short acks;
         private final int timeout;
         private final Map<TopicPartition, MemoryRecords> partitionRecords;
         private final String transactionalId;
 
-        public Builder(byte magic,
-                       short acks,
-                       int timeout,
-                       Map<TopicPartition, MemoryRecords> partitionRecords,
-                       String transactionalId) {
-            super(ApiKeys.PRODUCE, (short) (magic == RecordBatch.MAGIC_VALUE_V2 ? ApiKeys.PRODUCE.latestVersion() : 2));
-            this.magic = magic;
+        public static Builder forCurrentMagic(short acks,
+                                              int timeout,
+                                              Map<TopicPartition, MemoryRecords> partitionRecords) {
+            return forMagic(RecordBatch.CURRENT_MAGIC_VALUE, acks, timeout, partitionRecords, null);
+        }
+
+        public static Builder forMagic(byte magic,
+                                       short acks,
+                                       int timeout,
+                                       Map<TopicPartition, MemoryRecords> partitionRecords,
+                                       String transactionalId) {
+            // Message format upgrades correspond with a bump in the produce request version. Older
+            // message format versions are generally not supported by the produce request versions
+            // following the bump.
+
+            final short minVersion;
+            final short maxVersion;
+            if (magic < RecordBatch.MAGIC_VALUE_V2) {
+                minVersion = 2;
+                maxVersion = 2;
+            } else {
+                minVersion = 3;
+                maxVersion = ApiKeys.PRODUCE.latestVersion();
+            }
+            return new Builder(minVersion, maxVersion, acks, timeout, partitionRecords, transactionalId);
+        }
+
+        private Builder(short minVersion,
+                        short maxVersion,
+                        short acks,
+                        int timeout,
+                        Map<TopicPartition, MemoryRecords> partitionRecords,
+                        String transactionalId) {
+            super(ApiKeys.PRODUCE, minVersion, maxVersion);
             this.acks = acks;
             this.timeout = timeout;
             this.partitionRecords = partitionRecords;
             this.transactionalId = transactionalId;
         }
 
-        public Builder(byte magic,
-                       short acks,
-                       int timeout,
-                       Map<TopicPartition, MemoryRecords> partitionRecords) {
-            this(magic, acks, timeout, partitionRecords, null);
-        }
-
         @Override
         public ProduceRequest build(short version) {
-            if (version < 2)
-                throw new UnsupportedVersionException("ProduceRequest versions older than 2 are not supported.");
-
             return new ProduceRequest(version, acks, timeout, partitionRecords, transactionalId);
         }
 
@@ -154,7 +169,6 @@ public class ProduceRequest extends AbstractRequest {
         public String toString() {
             StringBuilder bld = new StringBuilder();
             bld.append("(type=ProduceRequest")
-                    .append(", magic=").append(magic)
                     .append(", acks=").append(acks)
                     .append(", timeout=").append(timeout)
                     .append(", partitionRecords=(").append(partitionRecords)

http://git-wip-us.apache.org/repos/asf/kafka/blob/852297ef/clients/src/test/java/org/apache/kafka/clients/MockClient.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 66ff253..4037112 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -147,7 +147,7 @@ public class MockClient implements KafkaClient {
         while (iter.hasNext()) {
             ClientRequest request = iter.next();
             if (request.destination().equals(node)) {
-                short version = request.requestBuilder().desiredOrLatestVersion();
+                short version = request.requestBuilder().latestAllowedVersion();
                 responses.add(new ClientResponse(request.makeHeader(version), request.callback(), request.destination(),
                         request.createdTimeMs(), now, true, null, null));
                 iter.remove();
@@ -165,7 +165,8 @@ public class MockClient implements KafkaClient {
                 continue;
 
             AbstractRequest.Builder<?> builder = request.requestBuilder();
-            short version = nodeApiVersions.usableVersion(request.apiKey(), builder.desiredVersion());
+            short version = nodeApiVersions.usableVersion(request.apiKey(), builder.oldestAllowedVersion(),
+                    builder.latestAllowedVersion());
             AbstractRequest abstractRequest = request.requestBuilder().build(version);
             if (!futureResp.requestMatcher.matches(abstractRequest))
                 throw new IllegalStateException("Request matcher did not match next-in-line request " + abstractRequest);
@@ -234,7 +235,7 @@ public class MockClient implements KafkaClient {
     public void respondToRequest(ClientRequest clientRequest, AbstractResponse response) {
         AbstractRequest request = clientRequest.requestBuilder().build();
         requests.remove(clientRequest);
-        short version = clientRequest.requestBuilder().desiredOrLatestVersion();
+        short version = clientRequest.requestBuilder().latestAllowedVersion();
         responses.add(new ClientResponse(clientRequest.makeHeader(version), clientRequest.callback(), clientRequest.destination(),
                 clientRequest.createdTimeMs(), time.milliseconds(), false, null, response));
     }
@@ -242,7 +243,7 @@ public class MockClient implements KafkaClient {
 
     public void respond(AbstractResponse response, boolean disconnected) {
         ClientRequest request = requests.remove();
-        short version = request.requestBuilder().desiredOrLatestVersion();
+        short version = request.requestBuilder().latestAllowedVersion();
         responses.add(new ClientResponse(request.makeHeader(version), request.callback(), request.destination(),
                 request.createdTimeMs(), time.milliseconds(), disconnected, null, response));
     }
@@ -257,7 +258,7 @@ public class MockClient implements KafkaClient {
             ClientRequest request = iterator.next();
             if (request.destination().equals(node.idString())) {
                 iterator.remove();
-                short version = request.requestBuilder().desiredOrLatestVersion();
+                short version = request.requestBuilder().latestAllowedVersion();
                 responses.add(new ClientResponse(request.makeHeader(version), request.callback(), request.destination(),
                         request.createdTimeMs(), time.milliseconds(), disconnected, null, response));
                 return;

http://git-wip-us.apache.org/repos/asf/kafka/blob/852297ef/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index 86ad2ff..ff0f8f2 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -23,7 +23,6 @@ import org.apache.kafka.common.network.NetworkReceive;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.record.MemoryRecords;
-import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.requests.ProduceRequest;
@@ -116,7 +115,7 @@ public class NetworkClientTest {
         client.poll(1, time.milliseconds());
         assertTrue("The client should be ready", client.isReady(node, time.milliseconds()));
 
-        ProduceRequest.Builder builder = new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, (short) 1, 1000,
+        ProduceRequest.Builder builder = ProduceRequest.Builder.forCurrentMagic((short) 1, 1000,
                 Collections.<TopicPartition, MemoryRecords>emptyMap());
         ClientRequest request = client.newClientRequest(node.idString(), builder, time.milliseconds(), true);
         client.send(request, time.milliseconds());
@@ -134,7 +133,7 @@ public class NetworkClientTest {
 
     private void checkSimpleRequestResponse(NetworkClient networkClient) {
         awaitReady(networkClient, node); // has to be before creating any request, as it may send ApiVersionsRequest and its response is mocked with correlation id 0
-        ProduceRequest.Builder builder = new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, (short) 1, 1000,
+        ProduceRequest.Builder builder = ProduceRequest.Builder.forCurrentMagic((short) 1, 1000,
                         Collections.<TopicPartition, MemoryRecords>emptyMap());
         TestCallbackHandler handler = new TestCallbackHandler();
         ClientRequest request = networkClient.newClientRequest(
@@ -179,7 +178,7 @@ public class NetworkClientTest {
     @Test
     public void testRequestTimeout() {
         awaitReady(client, node); // has to be before creating any request, as it may send ApiVersionsRequest and its response is mocked with correlation id 0
-        ProduceRequest.Builder builder = new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, (short) 1,
+        ProduceRequest.Builder builder = ProduceRequest.Builder.forCurrentMagic((short) 1,
                 1000, Collections.<TopicPartition, MemoryRecords>emptyMap());
         TestCallbackHandler handler = new TestCallbackHandler();
         long now = time.milliseconds();

http://git-wip-us.apache.org/repos/asf/kafka/blob/852297ef/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java b/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
index 6b1d92a..266d2c9 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
@@ -91,34 +91,32 @@ public class NodeApiVersionsTest {
     }
 
     @Test
-    public void testUsableVersionNoDesiredVersionReturnsLatestUsable() {
-        NodeApiVersions apiVersions = NodeApiVersions.create(Collections.singleton(
-                new ApiVersion(ApiKeys.PRODUCE.id, (short) 1, (short) 3)));
-        assertEquals(3, apiVersions.usableVersion(ApiKeys.PRODUCE, null));
-    }
-
-    @Test
     public void testDesiredVersion() {
         NodeApiVersions apiVersions = NodeApiVersions.create(Collections.singleton(
                 new ApiVersion(ApiKeys.PRODUCE.id, (short) 1, (short) 3)));
         assertEquals(3, apiVersions.usableVersion(ApiKeys.PRODUCE));
-        assertEquals(1, apiVersions.usableVersion(ApiKeys.PRODUCE, (short) 1));
-        assertEquals(2, apiVersions.usableVersion(ApiKeys.PRODUCE, (short) 2));
-        assertEquals(3, apiVersions.usableVersion(ApiKeys.PRODUCE, (short) 3));
+        assertEquals(1, apiVersions.usableVersion(ApiKeys.PRODUCE, (short) 0, (short) 1));
+        assertEquals(1, apiVersions.usableVersion(ApiKeys.PRODUCE, (short) 1, (short) 1));
+        assertEquals(2, apiVersions.usableVersion(ApiKeys.PRODUCE, (short) 1, (short) 2));
+        assertEquals(3, apiVersions.usableVersion(ApiKeys.PRODUCE, (short) 1, (short) 3));
+        assertEquals(2, apiVersions.usableVersion(ApiKeys.PRODUCE, (short) 2, (short) 2));
+        assertEquals(3, apiVersions.usableVersion(ApiKeys.PRODUCE, (short) 2, (short) 3));
+        assertEquals(3, apiVersions.usableVersion(ApiKeys.PRODUCE, (short) 3, (short) 3));
+        assertEquals(3, apiVersions.usableVersion(ApiKeys.PRODUCE, (short) 3, (short) 4));
     }
 
     @Test(expected = UnsupportedVersionException.class)
     public void testDesiredVersionTooLarge() {
         NodeApiVersions apiVersions = NodeApiVersions.create(Collections.singleton(
                 new ApiVersion(ApiKeys.PRODUCE.id, (short) 1, (short) 2)));
-        apiVersions.usableVersion(ApiKeys.PRODUCE, (short) 3);
+        apiVersions.usableVersion(ApiKeys.PRODUCE, (short) 3, (short) 4);
     }
 
     @Test(expected = UnsupportedVersionException.class)
     public void testDesiredVersionTooSmall() {
         NodeApiVersions apiVersions = NodeApiVersions.create(Collections.singleton(
-                new ApiVersion(ApiKeys.PRODUCE.id, (short) 1, (short) 2)));
-        apiVersions.usableVersion(ApiKeys.PRODUCE, (short) 0);
+                new ApiVersion(ApiKeys.PRODUCE.id, (short) 2, (short) 3)));
+        apiVersions.usableVersion(ApiKeys.PRODUCE, (short) 0, (short) 1);
     }
 
     @Test(expected = UnsupportedVersionException.class)

http://git-wip-us.apache.org/repos/asf/kafka/blob/852297ef/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index ecf77aa..a45d9ac 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -258,7 +258,7 @@ public class SenderTest {
 
         for (int i = 1; i <= 3; i++) {
             int throttleTimeMs = 100 * i;
-            ProduceRequest.Builder builder = new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, (short) 1, 1000,
+            ProduceRequest.Builder builder = ProduceRequest.Builder.forCurrentMagic((short) 1, 1000,
                             Collections.<TopicPartition, MemoryRecords>emptyMap());
             ClientRequest request = client.newClientRequest(node.idString(), builder, time.milliseconds(), true, null);
             client.send(request, time.milliseconds());

http://git-wip-us.apache.org/repos/asf/kafka/blob/852297ef/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java
index 0e8f382..ef17c96 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java
@@ -18,16 +18,25 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.InvalidRecordException;
 import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
 import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
 import org.junit.Test;
 
+import java.nio.ByteBuffer;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class ProduceRequestTest {
 
@@ -37,18 +46,10 @@ public class ProduceRequestTest {
 
     @Test
     public void shouldBeFlaggedAsTransactionalWhenTransactionalRecords() throws Exception {
-        final MemoryRecords memoryRecords = MemoryRecords.withTransactionalRecords(0,
-                                                                                   CompressionType.NONE,
-                                                                                   1L,
-                                                                                   (short) 1,
-                                                                                   1,
-                                                                                   1,
-                                                                                   simpleRecord);
-        final ProduceRequest request = new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE,
-                                                                  (short) -1,
-                                                                  10,
-                                                                  Collections.singletonMap(
-                                                                          new TopicPartition("topic", 1), memoryRecords)).build();
+        final MemoryRecords memoryRecords = MemoryRecords.withTransactionalRecords(0, CompressionType.NONE, 1L,
+                (short) 1, 1, 1, simpleRecord);
+        final ProduceRequest request = ProduceRequest.Builder.forCurrentMagic((short) -1,
+                10, Collections.singletonMap(new TopicPartition("topic", 1), memoryRecords)).build();
         assertTrue(request.isTransactional());
     }
 
@@ -66,30 +67,116 @@ public class ProduceRequestTest {
 
     @Test
     public void shouldBeFlaggedAsIdempotentWhenIdempotentRecords() throws Exception {
-        final MemoryRecords memoryRecords = MemoryRecords.withIdempotentRecords(1,
-                                                                                CompressionType.NONE,
-                                                                                1L,
-                                                                                (short) 1,
-                                                                                1,
-                                                                                1,
-                                                                                simpleRecord);
-
-        final ProduceRequest request = new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE,
-                                                                  (short) -1,
-                                                                  10,
-                                                                  Collections.singletonMap(
-                                                                          new TopicPartition("topic", 1), memoryRecords)).build();
+        final MemoryRecords memoryRecords = MemoryRecords.withIdempotentRecords(1, CompressionType.NONE, 1L,
+                (short) 1, 1, 1, simpleRecord);
+        final ProduceRequest request = ProduceRequest.Builder.forCurrentMagic((short) -1, 10,
+                Collections.singletonMap(new TopicPartition("topic", 1), memoryRecords)).build();
         assertTrue(request.isIdempotent());
+    }
+
+    @Test
+    public void testBuildWithOldMessageFormat() {
+        ByteBuffer buffer = ByteBuffer.allocate(256);
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V1, CompressionType.NONE,
+                TimestampType.CREATE_TIME, 0L);
+        builder.append(10L, null, "a".getBytes());
+        Map<TopicPartition, MemoryRecords> produceData = new HashMap<>();
+        produceData.put(new TopicPartition("test", 0), builder.build());
+
+        ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(RecordBatch.MAGIC_VALUE_V1, (short) 1,
+                5000, produceData, null);
+        assertEquals(2, requestBuilder.oldestAllowedVersion());
+        assertEquals(2, requestBuilder.latestAllowedVersion());
+    }
+
+    @Test
+    public void testBuildWithCurrentMessageFormat() {
+        ByteBuffer buffer = ByteBuffer.allocate(256);
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE,
+                CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
+        builder.append(10L, null, "a".getBytes());
+        Map<TopicPartition, MemoryRecords> produceData = new HashMap<>();
+        produceData.put(new TopicPartition("test", 0), builder.build());
+
+        ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(RecordBatch.CURRENT_MAGIC_VALUE,
+                (short) 1, 5000, produceData, null);
+        assertEquals(3, requestBuilder.oldestAllowedVersion());
+        assertEquals(ApiKeys.PRODUCE.latestVersion(), requestBuilder.latestAllowedVersion());
+    }
+
+    @Test
+    public void testV3AndAboveShouldContainOnlyOneRecordBatch() {
+        ByteBuffer buffer = ByteBuffer.allocate(256);
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
+        builder.append(10L, null, "a".getBytes());
+        builder.close();
+
+        builder = MemoryRecords.builder(buffer, CompressionType.NONE, TimestampType.CREATE_TIME, 1L);
+        builder.append(11L, "1".getBytes(), "b".getBytes());
+        builder.append(12L, null, "c".getBytes());
+        builder.close();
+
+        buffer.flip();
+
+        Map<TopicPartition, MemoryRecords> produceData = new HashMap<>();
+        produceData.put(new TopicPartition("test", 0), MemoryRecords.readableRecords(buffer));
+        ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forCurrentMagic((short) 1, 5000, produceData);
+        assertThrowsInvalidRecordExceptionForAllVersions(requestBuilder);
+    }
+
+    @Test
+    public void testV3AndAboveCannotHaveNoRecordBatches() {
+        Map<TopicPartition, MemoryRecords> produceData = new HashMap<>();
+        produceData.put(new TopicPartition("test", 0), MemoryRecords.EMPTY);
+        ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forCurrentMagic((short) 1, 5000, produceData);
+        assertThrowsInvalidRecordExceptionForAllVersions(requestBuilder);
+    }
+
+    @Test
+    public void testV3AndAboveCannotUseMagicV0() {
+        ByteBuffer buffer = ByteBuffer.allocate(256);
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V0, CompressionType.NONE,
+                TimestampType.NO_TIMESTAMP_TYPE, 0L);
+        builder.append(10L, null, "a".getBytes());
+
+        Map<TopicPartition, MemoryRecords> produceData = new HashMap<>();
+        produceData.put(new TopicPartition("test", 0), builder.build());
+        ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forCurrentMagic((short) 1, 5000, produceData);
+        assertThrowsInvalidRecordExceptionForAllVersions(requestBuilder);
+    }
+
+    @Test
+    public void testV3AndAboveCannotUseMagicV1() {
+        ByteBuffer buffer = ByteBuffer.allocate(256);
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V1, CompressionType.NONE,
+                TimestampType.CREATE_TIME, 0L);
+        builder.append(10L, null, "a".getBytes());
+
+        Map<TopicPartition, MemoryRecords> produceData = new HashMap<>();
+        produceData.put(new TopicPartition("test", 0), builder.build());
+        ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forCurrentMagic((short) 1, 5000, produceData);
+        assertThrowsInvalidRecordExceptionForAllVersions(requestBuilder);
+    }
+
+    private void assertThrowsInvalidRecordExceptionForAllVersions(ProduceRequest.Builder builder) {
+        for (short version = builder.oldestAllowedVersion(); version < builder.latestAllowedVersion(); version++) {
+            assertThrowsInvalidRecordException(builder, version);
+        }
+    }
 
+    private void assertThrowsInvalidRecordException(ProduceRequest.Builder builder, short version) {
+        try {
+            builder.build(version).toStruct();
+            fail("Builder did not raise " + InvalidRecordException.class.getName() + " as expected");
+        } catch (RuntimeException e) {
+            assertTrue("Unexpected exception type " + e.getClass().getName(),
+                    InvalidRecordException.class.isAssignableFrom(e.getClass()));
+        }
     }
 
     private ProduceRequest createNonIdempotentNonTransactionalRecords() {
-        final MemoryRecords memoryRecords = MemoryRecords.withRecords(CompressionType.NONE,
-                                                                      simpleRecord);
-        return new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE,
-                                          (short) -1,
-                                          10,
-                                          Collections.singletonMap(
-                                                  new TopicPartition("topic", 1), memoryRecords)).build();
+        final MemoryRecords memoryRecords = MemoryRecords.withRecords(CompressionType.NONE, simpleRecord);
+        return ProduceRequest.Builder.forCurrentMagic((short) -1, 10,
+                Collections.singletonMap(new TopicPartition("topic", 1), memoryRecords)).build();
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/852297ef/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index a76cc84..e96b188 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -17,14 +17,14 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.clients.admin.NewPartitions;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.acl.AccessControlEntry;
 import org.apache.kafka.common.acl.AccessControlEntryFilter;
 import org.apache.kafka.common.acl.AclBinding;
 import org.apache.kafka.common.acl.AclBindingFilter;
 import org.apache.kafka.common.acl.AclOperation;
 import org.apache.kafka.common.acl.AclPermissionType;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.NotCoordinatorException;
@@ -39,12 +39,9 @@ import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.record.CompressionType;
-import org.apache.kafka.common.record.InvalidRecordException;
 import org.apache.kafka.common.record.MemoryRecords;
-import org.apache.kafka.common.record.MemoryRecordsBuilder;
 import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.SimpleRecord;
-import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation;
 import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse;
 import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult;
@@ -397,56 +394,6 @@ public class RequestResponseTest {
         assertEquals("Response data does not match", responseData, v2Response.responses());
     }
 
-    @Test(expected = InvalidRecordException.class)
-    public void produceRequestV3ShouldContainOnlyOneRecordBatch() {
-        ByteBuffer buffer = ByteBuffer.allocate(256);
-        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
-        builder.append(10L, null, "a".getBytes());
-        builder.close();
-
-        builder = MemoryRecords.builder(buffer, CompressionType.NONE, TimestampType.CREATE_TIME, 1L);
-        builder.append(11L, "1".getBytes(), "b".getBytes());
-        builder.append(12L, null, "c".getBytes());
-        builder.close();
-
-        buffer.flip();
-
-        Map<TopicPartition, MemoryRecords> produceData = new HashMap<>();
-        produceData.put(new TopicPartition("test", 0), MemoryRecords.readableRecords(buffer));
-        new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, (short) 1, 5000, produceData).build().toStruct();
-    }
-
-    @Test(expected = InvalidRecordException.class)
-    public void produceRequestV3CannotHaveNoRecordBatches() {
-        Map<TopicPartition, MemoryRecords> produceData = new HashMap<>();
-        produceData.put(new TopicPartition("test", 0), MemoryRecords.EMPTY);
-        new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, (short) 1, 5000, produceData).build().toStruct();
-    }
-
-    @Test(expected = InvalidRecordException.class)
-    public void produceRequestV3CannotUseMagicV0() {
-        ByteBuffer buffer = ByteBuffer.allocate(256);
-        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V0, CompressionType.NONE,
-                TimestampType.NO_TIMESTAMP_TYPE, 0L);
-        builder.append(10L, null, "a".getBytes());
-
-        Map<TopicPartition, MemoryRecords> produceData = new HashMap<>();
-        produceData.put(new TopicPartition("test", 0), builder.build());
-        new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, (short) 1, 5000, produceData).build().toStruct();
-    }
-
-    @Test(expected = InvalidRecordException.class)
-    public void produceRequestV3CannotUseMagicV1() {
-        ByteBuffer buffer = ByteBuffer.allocate(256);
-        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V1, CompressionType.NONE,
-                TimestampType.CREATE_TIME, 0L);
-        builder.append(10L, null, "a".getBytes());
-
-        Map<TopicPartition, MemoryRecords> produceData = new HashMap<>();
-        produceData.put(new TopicPartition("test", 0), builder.build());
-        new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, (short) 1, 5000, produceData).build().toStruct();
-    }
-
     @Test
     public void fetchResponseVersionTest() {
         LinkedHashMap<TopicPartition, FetchResponse.PartitionData> responseData = new LinkedHashMap<>();
@@ -774,7 +721,8 @@ public class RequestResponseTest {
         byte magic = version == 2 ? RecordBatch.MAGIC_VALUE_V1 : RecordBatch.MAGIC_VALUE_V2;
         MemoryRecords records = MemoryRecords.withRecords(magic, CompressionType.NONE, new SimpleRecord("woot".getBytes()));
         Map<TopicPartition, MemoryRecords> produceData = Collections.singletonMap(new TopicPartition("test", 0), records);
-        return new ProduceRequest.Builder(magic, (short) 1, 5000, produceData).build((short) version);
+        return ProduceRequest.Builder.forMagic(magic, (short) 1, 5000, produceData, "transactionalId")
+                .build((short) version);
     }
 
     private ProduceResponse createProduceResponse() {
@@ -796,11 +744,11 @@ public class RequestResponseTest {
     }
 
     private ControlledShutdownRequest createControlledShutdownRequest() {
-        return new ControlledShutdownRequest.Builder(10).build();
+        return new ControlledShutdownRequest.Builder(10, ApiKeys.CONTROLLED_SHUTDOWN.latestVersion()).build();
     }
 
     private ControlledShutdownRequest createControlledShutdownRequest(int version) {
-        return new ControlledShutdownRequest.Builder(10).build((short) version);
+        return new ControlledShutdownRequest.Builder(10, ApiKeys.CONTROLLED_SHUTDOWN.latestVersion()).build((short) version);
     }
 
     private ControlledShutdownResponse createControlledShutdownResponse() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/852297ef/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/InterBrokerSendThread.scala b/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
index 06158b2..70dae35 100644
--- a/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
+++ b/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
@@ -59,15 +59,16 @@ abstract class InterBrokerSendThread(name: String,
         if (networkClient.ready(request.destination, now)) {
           networkClient.send(clientRequest, now)
         } else {
-          val disConnectedResponse: ClientResponse = new ClientResponse(clientRequest.makeHeader(request.request.desiredOrLatestVersion()),
-            completionHandler, destination,
-            now /* createdTimeMs */ , now /* receivedTimeMs */ , true /* disconnected */ , null /* versionMismatch */ , null /* responseBody */)
+          val header = clientRequest.makeHeader(request.request.latestAllowedVersion)
+          val disconnectResponse: ClientResponse = new ClientResponse(header, completionHandler, destination,
+            now /* createdTimeMs */ , now /* receivedTimeMs */ , true /* disconnected */ , null /* versionMismatch */ ,
+            null /* responseBody */)
 
           // poll timeout would be the minimum of connection delay if there are any dest yet to be reached;
           // otherwise it is infinity
           pollTimeout = Math.min(pollTimeout, networkClient.connectionDelay(request.destination, now))
 
-          completionHandler.onComplete(disConnectedResponse)
+          completionHandler.onComplete(disconnectResponse)
         }
       }
       networkClient.poll(pollTimeout, now)

http://git-wip-us.apache.org/repos/asf/kafka/blob/852297ef/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 7aeffb5..02c40b1 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -256,7 +256,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   }
 
   private def createProduceRequest = {
-    new requests.ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, 1, 5000,
+    requests.ProduceRequest.Builder.forCurrentMagic(1, 5000,
       collection.mutable.Map(tp -> MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("test".getBytes))).asJava).
       build()
   }
@@ -330,7 +330,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   private def stopReplicaRequest = new StopReplicaRequest.Builder(brokerId, Int.MaxValue, true, Set(tp).asJava).build()
 
-  private def controlledShutdownRequest = new requests.ControlledShutdownRequest.Builder(brokerId).build()
+  private def controlledShutdownRequest = new requests.ControlledShutdownRequest.Builder(brokerId,
+    ApiKeys.CONTROLLED_SHUTDOWN.latestVersion).build()
 
   private def createTopicsRequest =
     new CreateTopicsRequest.Builder(Map(createTopic -> new TopicDetails(1, 1.toShort)).asJava, 0).build()

http://git-wip-us.apache.org/repos/asf/kafka/blob/852297ef/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index d623374..8b611f2 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -147,7 +147,7 @@ class SocketServerTest extends JUnitSuite {
     val ackTimeoutMs = 10000
     val ack = 0: Short
 
-    val emptyRequest = new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, ack, ackTimeoutMs,
+    val emptyRequest = ProduceRequest.Builder.forCurrentMagic(ack, ackTimeoutMs,
       new HashMap[TopicPartition, MemoryRecords]()).build()
     val emptyHeader = new RequestHeader(ApiKeys.PRODUCE, emptyRequest.version, clientId, correlationId)
     val byteBuffer = emptyRequest.serialize(emptyHeader)
@@ -465,7 +465,7 @@ class SocketServerTest extends JUnitSuite {
       val clientId = ""
       val ackTimeoutMs = 10000
       val ack = 0: Short
-      val emptyRequest = new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, ack, ackTimeoutMs,
+      val emptyRequest = ProduceRequest.Builder.forCurrentMagic(ack, ackTimeoutMs,
         new HashMap[TopicPartition, MemoryRecords]()).build()
       val emptyHeader = new RequestHeader(ApiKeys.PRODUCE, emptyRequest.version, clientId, correlationId)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/852297ef/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
index b9a4dfe..092dffe 100755
--- a/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
@@ -119,8 +119,7 @@ class EdgeCaseRequestTest extends KafkaServerTestHarness {
       val headerBytes = requestHeaderBytes(ApiKeys.PRODUCE.id, version, null,
         correlationId)
       val records = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("message".getBytes))
-      val request = new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, 1, 10000,
-        Map(topicPartition -> records).asJava).build()
+      val request = ProduceRequest.Builder.forCurrentMagic(1, 10000, Map(topicPartition -> records).asJava).build()
       val byteBuffer = ByteBuffer.allocate(headerBytes.length + request.toStruct.sizeOf)
       byteBuffer.put(headerBytes)
       request.toStruct.writeTo(byteBuffer)

http://git-wip-us.apache.org/repos/asf/kafka/blob/852297ef/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
index 189f57c..029bb33 100644
--- a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
@@ -41,7 +41,7 @@ class ProduceRequestTest extends BaseRequestTest {
       val topicPartition = new TopicPartition("topic", partition)
       val partitionRecords = Map(topicPartition -> memoryRecords)
       val produceResponse = sendProduceRequest(leader,
-          new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, -1, 3000, partitionRecords.asJava).build())
+          ProduceRequest.Builder.forCurrentMagic(-1, 3000, partitionRecords.asJava).build())
       assertEquals(1, produceResponse.responses.size)
       val (tp, partitionResponse) = produceResponse.responses.asScala.head
       assertEquals(topicPartition, tp)
@@ -79,7 +79,7 @@ class ProduceRequestTest extends BaseRequestTest {
     val topicPartition = new TopicPartition("topic", partition)
     val partitionRecords = Map(topicPartition -> memoryRecords)
     val produceResponse = sendProduceRequest(leader, 
-      new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, -1, 3000, partitionRecords.asJava).build())
+      ProduceRequest.Builder.forCurrentMagic(-1, 3000, partitionRecords.asJava).build())
     assertEquals(1, produceResponse.responses.size)
     val (tp, partitionResponse) = produceResponse.responses.asScala.head
     assertEquals(topicPartition, tp)

http://git-wip-us.apache.org/repos/asf/kafka/blob/852297ef/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index c244bd7..e15ea4b 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -165,7 +165,7 @@ class RequestQuotaTest extends BaseRequestTest {
   private def requestBuilder(apiKey: ApiKeys): AbstractRequest.Builder[_ <: AbstractRequest] = {
     apiKey match {
         case ApiKeys.PRODUCE =>
-          new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, 1, 5000,
+          ProduceRequest.Builder.forCurrentMagic(1, 5000,
             collection.mutable.Map(tp -> MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("test".getBytes))).asJava)
 
         case ApiKeys.FETCH =>
@@ -198,7 +198,7 @@ class RequestQuotaTest extends BaseRequestTest {
           new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion, brokerId, Int.MaxValue, partitionState, brokers)
 
         case ApiKeys.CONTROLLED_SHUTDOWN =>
-          new ControlledShutdownRequest.Builder(brokerId)
+          new ControlledShutdownRequest.Builder(brokerId, ApiKeys.CONTROLLED_SHUTDOWN.latestVersion)
 
         case ApiKeys.OFFSET_COMMIT =>
           new OffsetCommitRequest.Builder("test-group",