You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/09/25 23:13:15 UTC
kafka git commit: KAFKA-5960;
Fix regression in produce version selection on old brokers
Repository: kafka
Updated Branches:
refs/heads/trunk a96e28eac -> 852297efd
KAFKA-5960; Fix regression in produce version selection on old brokers
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Apurva Mehta <ap...@confluent.io>, Ismael Juma <is...@juma.me.uk>
Closes #3944 from hachikuji/KAFKA-5960
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/852297ef
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/852297ef
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/852297ef
Branch: refs/heads/trunk
Commit: 852297efd99af04df28710b1b5c99530ab20a072
Parents: a96e28e
Author: Jason Gustafson <ja...@confluent.io>
Authored: Tue Sep 26 00:13:02 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Tue Sep 26 00:13:02 2017 +0100
----------------------------------------------------------------------
.../org/apache/kafka/clients/NetworkClient.java | 7 +-
.../apache/kafka/clients/NodeApiVersions.java | 38 ++---
.../internals/ConsumerNetworkClient.java | 2 +-
.../clients/producer/internals/Sender.java | 2 +-
.../kafka/common/requests/AbstractRequest.java | 24 +--
.../requests/ControlledShutdownRequest.java | 6 +-
.../kafka/common/requests/FetchRequest.java | 15 +-
.../common/requests/ListOffsetRequest.java | 16 +-
.../kafka/common/requests/ProduceRequest.java | 54 ++++---
.../org/apache/kafka/clients/MockClient.java | 11 +-
.../apache/kafka/clients/NetworkClientTest.java | 7 +-
.../kafka/clients/NodeApiVersionsTest.java | 24 ++-
.../clients/producer/internals/SenderTest.java | 2 +-
.../common/requests/ProduceRequestTest.java | 153 +++++++++++++++----
.../common/requests/RequestResponseTest.java | 64 +-------
.../kafka/common/InterBrokerSendThread.scala | 9 +-
.../kafka/api/AuthorizerIntegrationTest.scala | 5 +-
.../unit/kafka/network/SocketServerTest.scala | 4 +-
.../unit/kafka/server/EdgeCaseRequestTest.scala | 3 +-
.../unit/kafka/server/ProduceRequestTest.scala | 4 +-
.../unit/kafka/server/RequestQuotaTest.scala | 4 +-
21 files changed, 243 insertions(+), 211 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/852297ef/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index f8da42c..0fbaff7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -384,12 +384,13 @@ public class NetworkClient implements KafkaClient {
// the case when sending the initial ApiVersionRequest which fetches the version
// information itself. It is also the case when discoverBrokerVersions is set to false.
if (versionInfo == null) {
- version = builder.desiredOrLatestVersion();
+ version = builder.latestAllowedVersion();
if (discoverBrokerVersions && log.isTraceEnabled())
log.trace("No version information found when sending {} with correlation id {} to node {}. " +
"Assuming version {}.", clientRequest.apiKey(), clientRequest.correlationId(), nodeId, version);
} else {
- version = versionInfo.usableVersion(clientRequest.apiKey(), builder.desiredVersion());
+ version = versionInfo.usableVersion(clientRequest.apiKey(), builder.oldestAllowedVersion(),
+ builder.latestAllowedVersion());
}
// The call to build may also throw UnsupportedVersionException, if there are essential
// fields that cannot be represented in the chosen version.
@@ -399,7 +400,7 @@ public class NetworkClient implements KafkaClient {
// Instead, simply add it to the local queue of aborted requests.
log.debug("Version mismatch when attempting to send {} with correlation id {} to {}", builder,
clientRequest.correlationId(), clientRequest.destination(), e);
- ClientResponse clientResponse = new ClientResponse(clientRequest.makeHeader(builder.desiredOrLatestVersion()),
+ ClientResponse clientResponse = new ClientResponse(clientRequest.makeHeader(builder.latestAllowedVersion()),
clientRequest.callback(), clientRequest.destination(), now, now,
false, e, null);
abortedSends.add(clientResponse);
http://git-wip-us.apache.org/repos/asf/kafka/blob/852297ef/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java b/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
index dc2e6d1..93a5c72 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
@@ -89,24 +89,17 @@ public class NodeApiVersions {
* Return the most recent version supported by both the node and the local software.
*/
public short usableVersion(ApiKeys apiKey) {
- return usableVersion(apiKey, null);
+ return usableVersion(apiKey, apiKey.oldestVersion(), apiKey.latestVersion());
}
/**
- * Return the desired version (if usable) or the latest usable version if the desired version is null.
+ * Get the latest version supported by the broker within an allowed range of versions
*/
- public short usableVersion(ApiKeys apiKey, Short desiredVersion) {
+ public short usableVersion(ApiKeys apiKey, short oldestAllowedVersion, short latestAllowedVersion) {
UsableVersion usableVersion = usableVersions.get(apiKey);
if (usableVersion == null)
throw new UnsupportedVersionException("The broker does not support " + apiKey);
-
- if (desiredVersion == null) {
- usableVersion.ensureUsable();
- return usableVersion.value;
- } else {
- usableVersion.ensureUsable(desiredVersion);
- return desiredVersion;
- }
+ return usableVersion.latestSupportedVersion(oldestAllowedVersion, latestAllowedVersion);
}
/**
@@ -226,22 +219,15 @@ public class NodeApiVersions {
return value == NODE_TOO_NEW;
}
- private void ensureUsable() {
- if (value == NODE_TOO_OLD)
- throw new UnsupportedVersionException("The broker is too old to support " + apiKey +
- " version " + apiKey.oldestVersion());
- else if (value == NODE_TOO_NEW)
- throw new UnsupportedVersionException("The broker is too new to support " + apiKey +
- " version " + apiKey.latestVersion());
+ private short latestSupportedVersion(short minAllowedVersion, short maxAllowedVersion) {
+ short minVersion = (short) Math.max(minAllowedVersion, apiVersion.minVersion);
+ short maxVersion = (short) Math.min(maxAllowedVersion, apiVersion.maxVersion);
+ if (minVersion > maxVersion)
+ throw new UnsupportedVersionException("The broker does not support " + apiKey +
+ " with version in range [" + minAllowedVersion + "," + maxAllowedVersion + "]. The supported" +
+ " range is [" + apiVersion.minVersion + "," + apiVersion.maxVersion + "].");
+ return maxVersion;
}
-
- private void ensureUsable(short desiredVersion) {
- if (apiVersion.minVersion > desiredVersion || apiVersion.maxVersion < desiredVersion)
- throw new UnsupportedVersionException("The broker does not support the requested version " + desiredVersion +
- " for api " + apiKey + ". Supported versions are " + apiVersion.minVersion +
- " to " + apiVersion.maxVersion + ".");
- }
-
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/852297ef/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index 86fca9e..2f9cb1b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -375,7 +375,7 @@ public class ConsumerNetworkClient implements Closeable {
if (authenticationException != null)
handler.onFailure(authenticationException);
else
- handler.onComplete(new ClientResponse(request.makeHeader(request.requestBuilder().desiredOrLatestVersion()),
+ handler.onComplete(new ClientResponse(request.makeHeader(request.requestBuilder().latestAllowedVersion()),
request.callback(), request.destination(), request.createdTimeMs(), now, true,
null, null));
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/852297ef/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 45a2919..1aadf3d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -686,7 +686,7 @@ public class Sender implements Runnable {
if (transactionManager != null && transactionManager.isTransactional()) {
transactionalId = transactionManager.transactionalId();
}
- ProduceRequest.Builder requestBuilder = new ProduceRequest.Builder(minUsedMagic, acks, timeout,
+ ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(minUsedMagic, acks, timeout,
produceRecordsByPartition, transactionalId);
RequestCompletionHandler callback = new RequestCompletionHandler() {
public void onComplete(ClientResponse response) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/852297ef/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index e093f77..bbf13d6 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -27,31 +27,37 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
public static abstract class Builder<T extends AbstractRequest> {
private final ApiKeys apiKey;
- private final Short desiredVersion;
+ private final short oldestAllowedVersion;
+ private final short latestAllowedVersion;
public Builder(ApiKeys apiKey) {
- this(apiKey, null);
+ this(apiKey, apiKey.oldestVersion(), apiKey.latestVersion());
}
- public Builder(ApiKeys apiKey, Short desiredVersion) {
+ public Builder(ApiKeys apiKey, short desiredVersion) {
+ this(apiKey, desiredVersion, desiredVersion);
+ }
+
+ public Builder(ApiKeys apiKey, short oldestAllowedVersion, short latestAllowedVersion) {
this.apiKey = apiKey;
- this.desiredVersion = desiredVersion;
+ this.oldestAllowedVersion = oldestAllowedVersion;
+ this.latestAllowedVersion = latestAllowedVersion;
}
public ApiKeys apiKey() {
return apiKey;
}
- public short desiredOrLatestVersion() {
- return desiredVersion == null ? apiKey.latestVersion() : desiredVersion;
+ public short oldestAllowedVersion() {
+ return oldestAllowedVersion;
}
- public Short desiredVersion() {
- return desiredVersion;
+ public short latestAllowedVersion() {
+ return latestAllowedVersion;
}
public T build() {
- return build(desiredOrLatestVersion());
+ return build(latestAllowedVersion());
}
public abstract T build(short version);
http://git-wip-us.apache.org/repos/asf/kafka/blob/852297ef/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
index c77bd13..e6e8734 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
@@ -42,11 +42,7 @@ public class ControlledShutdownRequest extends AbstractRequest {
public static class Builder extends AbstractRequest.Builder<ControlledShutdownRequest> {
private final int brokerId;
- public Builder(int brokerId) {
- this(brokerId, null);
- }
-
- public Builder(int brokerId, Short desiredVersion) {
+ public Builder(int brokerId, short desiredVersion) {
super(ApiKeys.CONTROLLED_SHUTDOWN, desiredVersion);
this.brokerId = brokerId;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/852297ef/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index 3fea26c..1a9e553 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -204,21 +204,24 @@ public class FetchRequest extends AbstractRequest {
private int maxBytes = DEFAULT_RESPONSE_MAX_BYTES;
public static Builder forConsumer(int maxWait, int minBytes, LinkedHashMap<TopicPartition, PartitionData> fetchData) {
- return new Builder(null, CONSUMER_REPLICA_ID, maxWait, minBytes, fetchData, IsolationLevel.READ_UNCOMMITTED);
+ return forConsumer(maxWait, minBytes, fetchData, IsolationLevel.READ_UNCOMMITTED);
}
- public static Builder forConsumer(int maxWait, int minBytes, LinkedHashMap<TopicPartition, PartitionData> fetchData, IsolationLevel isolationLevel) {
- return new Builder(null, CONSUMER_REPLICA_ID, maxWait, minBytes, fetchData, isolationLevel);
+ public static Builder forConsumer(int maxWait, int minBytes, LinkedHashMap<TopicPartition, PartitionData> fetchData,
+ IsolationLevel isolationLevel) {
+ return new Builder(ApiKeys.FETCH.oldestVersion(), ApiKeys.FETCH.latestVersion(), CONSUMER_REPLICA_ID,
+ maxWait, minBytes, fetchData, isolationLevel);
}
public static Builder forReplica(short desiredVersion, int replicaId, int maxWait, int minBytes,
LinkedHashMap<TopicPartition, PartitionData> fetchData) {
- return new Builder(desiredVersion, replicaId, maxWait, minBytes, fetchData, IsolationLevel.READ_UNCOMMITTED);
+ return new Builder(desiredVersion, desiredVersion, replicaId, maxWait, minBytes, fetchData,
+ IsolationLevel.READ_UNCOMMITTED);
}
- private Builder(Short desiredVersion, int replicaId, int maxWait, int minBytes,
+ private Builder(short minVersion, short maxVersion, int replicaId, int maxWait, int minBytes,
LinkedHashMap<TopicPartition, PartitionData> fetchData, IsolationLevel isolationLevel) {
- super(ApiKeys.FETCH, desiredVersion);
+ super(ApiKeys.FETCH, minVersion, maxVersion);
this.replicaId = replicaId;
this.maxWait = maxWait;
this.minBytes = minBytes;
http://git-wip-us.apache.org/repos/asf/kafka/blob/852297ef/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
index ace582d..e252d3d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
@@ -17,7 +17,6 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.ArrayOf;
@@ -104,7 +103,6 @@ public class ListOffsetRequest extends AbstractRequest {
public static class Builder extends AbstractRequest.Builder<ListOffsetRequest> {
private final int replicaId;
- private final short minVersion;
private final IsolationLevel isolationLevel;
private Map<TopicPartition, PartitionData> offsetData = null;
private Map<TopicPartition, Long> partitionTimestamps = null;
@@ -114,18 +112,16 @@ public class ListOffsetRequest extends AbstractRequest {
}
public static Builder forConsumer(boolean requireTimestamp, IsolationLevel isolationLevel) {
- // If we need a timestamp in the response, the minimum RPC version we can send is v1. Otherwise, v0 is OK.
short minVersion = 0;
if (isolationLevel == IsolationLevel.READ_COMMITTED)
minVersion = 2;
else if (requireTimestamp)
minVersion = 1;
- return new Builder(minVersion, null, CONSUMER_REPLICA_ID, isolationLevel);
+ return new Builder(minVersion, ApiKeys.LIST_OFFSETS.latestVersion(), CONSUMER_REPLICA_ID, isolationLevel);
}
- private Builder(short minVersion, Short desiredVersion, int replicaId, IsolationLevel isolationLevel) {
- super(ApiKeys.LIST_OFFSETS, desiredVersion);
- this.minVersion = minVersion;
+ private Builder(short minVersion, short maxVersion, int replicaId, IsolationLevel isolationLevel) {
+ super(ApiKeys.LIST_OFFSETS, minVersion, maxVersion);
this.replicaId = replicaId;
this.isolationLevel = isolationLevel;
}
@@ -142,10 +138,6 @@ public class ListOffsetRequest extends AbstractRequest {
@Override
public ListOffsetRequest build(short version) {
- if (version < minVersion) {
- throw new UnsupportedVersionException("Cannot create a v" + version + " ListOffsetRequest because " +
- "we require features supported only in " + minVersion + " or later.");
- }
if (version == 0) {
if (offsetData == null) {
if (partitionTimestamps == null) {
@@ -184,7 +176,7 @@ public class ListOffsetRequest extends AbstractRequest {
if (partitionTimestamps != null) {
bld.append(", partitionTimestamps=").append(partitionTimestamps);
}
- bld.append(", minVersion=").append(minVersion);
+ bld.append(", isolationLevel=").append(isolationLevel);
bld.append(")");
return bld.toString();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/852297ef/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
index 8ab0b20..fbc7f76 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
@@ -17,7 +17,6 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.ArrayOf;
@@ -116,37 +115,53 @@ public class ProduceRequest extends AbstractRequest {
}
public static class Builder extends AbstractRequest.Builder<ProduceRequest> {
- private final byte magic;
private final short acks;
private final int timeout;
private final Map<TopicPartition, MemoryRecords> partitionRecords;
private final String transactionalId;
- public Builder(byte magic,
- short acks,
- int timeout,
- Map<TopicPartition, MemoryRecords> partitionRecords,
- String transactionalId) {
- super(ApiKeys.PRODUCE, (short) (magic == RecordBatch.MAGIC_VALUE_V2 ? ApiKeys.PRODUCE.latestVersion() : 2));
- this.magic = magic;
+ public static Builder forCurrentMagic(short acks,
+ int timeout,
+ Map<TopicPartition, MemoryRecords> partitionRecords) {
+ return forMagic(RecordBatch.CURRENT_MAGIC_VALUE, acks, timeout, partitionRecords, null);
+ }
+
+ public static Builder forMagic(byte magic,
+ short acks,
+ int timeout,
+ Map<TopicPartition, MemoryRecords> partitionRecords,
+ String transactionalId) {
+ // Message format upgrades correspond with a bump in the produce request version. Older
+ // message format versions are generally not supported by the produce request versions
+ // following the bump.
+
+ final short minVersion;
+ final short maxVersion;
+ if (magic < RecordBatch.MAGIC_VALUE_V2) {
+ minVersion = 2;
+ maxVersion = 2;
+ } else {
+ minVersion = 3;
+ maxVersion = ApiKeys.PRODUCE.latestVersion();
+ }
+ return new Builder(minVersion, maxVersion, acks, timeout, partitionRecords, transactionalId);
+ }
+
+ private Builder(short minVersion,
+ short maxVersion,
+ short acks,
+ int timeout,
+ Map<TopicPartition, MemoryRecords> partitionRecords,
+ String transactionalId) {
+ super(ApiKeys.PRODUCE, minVersion, maxVersion);
this.acks = acks;
this.timeout = timeout;
this.partitionRecords = partitionRecords;
this.transactionalId = transactionalId;
}
- public Builder(byte magic,
- short acks,
- int timeout,
- Map<TopicPartition, MemoryRecords> partitionRecords) {
- this(magic, acks, timeout, partitionRecords, null);
- }
-
@Override
public ProduceRequest build(short version) {
- if (version < 2)
- throw new UnsupportedVersionException("ProduceRequest versions older than 2 are not supported.");
-
return new ProduceRequest(version, acks, timeout, partitionRecords, transactionalId);
}
@@ -154,7 +169,6 @@ public class ProduceRequest extends AbstractRequest {
public String toString() {
StringBuilder bld = new StringBuilder();
bld.append("(type=ProduceRequest")
- .append(", magic=").append(magic)
.append(", acks=").append(acks)
.append(", timeout=").append(timeout)
.append(", partitionRecords=(").append(partitionRecords)
http://git-wip-us.apache.org/repos/asf/kafka/blob/852297ef/clients/src/test/java/org/apache/kafka/clients/MockClient.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 66ff253..4037112 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -147,7 +147,7 @@ public class MockClient implements KafkaClient {
while (iter.hasNext()) {
ClientRequest request = iter.next();
if (request.destination().equals(node)) {
- short version = request.requestBuilder().desiredOrLatestVersion();
+ short version = request.requestBuilder().latestAllowedVersion();
responses.add(new ClientResponse(request.makeHeader(version), request.callback(), request.destination(),
request.createdTimeMs(), now, true, null, null));
iter.remove();
@@ -165,7 +165,8 @@ public class MockClient implements KafkaClient {
continue;
AbstractRequest.Builder<?> builder = request.requestBuilder();
- short version = nodeApiVersions.usableVersion(request.apiKey(), builder.desiredVersion());
+ short version = nodeApiVersions.usableVersion(request.apiKey(), builder.oldestAllowedVersion(),
+ builder.latestAllowedVersion());
AbstractRequest abstractRequest = request.requestBuilder().build(version);
if (!futureResp.requestMatcher.matches(abstractRequest))
throw new IllegalStateException("Request matcher did not match next-in-line request " + abstractRequest);
@@ -234,7 +235,7 @@ public class MockClient implements KafkaClient {
public void respondToRequest(ClientRequest clientRequest, AbstractResponse response) {
AbstractRequest request = clientRequest.requestBuilder().build();
requests.remove(clientRequest);
- short version = clientRequest.requestBuilder().desiredOrLatestVersion();
+ short version = clientRequest.requestBuilder().latestAllowedVersion();
responses.add(new ClientResponse(clientRequest.makeHeader(version), clientRequest.callback(), clientRequest.destination(),
clientRequest.createdTimeMs(), time.milliseconds(), false, null, response));
}
@@ -242,7 +243,7 @@ public class MockClient implements KafkaClient {
public void respond(AbstractResponse response, boolean disconnected) {
ClientRequest request = requests.remove();
- short version = request.requestBuilder().desiredOrLatestVersion();
+ short version = request.requestBuilder().latestAllowedVersion();
responses.add(new ClientResponse(request.makeHeader(version), request.callback(), request.destination(),
request.createdTimeMs(), time.milliseconds(), disconnected, null, response));
}
@@ -257,7 +258,7 @@ public class MockClient implements KafkaClient {
ClientRequest request = iterator.next();
if (request.destination().equals(node.idString())) {
iterator.remove();
- short version = request.requestBuilder().desiredOrLatestVersion();
+ short version = request.requestBuilder().latestAllowedVersion();
responses.add(new ClientResponse(request.makeHeader(version), request.callback(), request.destination(),
request.createdTimeMs(), time.milliseconds(), disconnected, null, response));
return;
http://git-wip-us.apache.org/repos/asf/kafka/blob/852297ef/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index 86ad2ff..ff0f8f2 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -23,7 +23,6 @@ import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.record.MemoryRecords;
-import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.ProduceRequest;
@@ -116,7 +115,7 @@ public class NetworkClientTest {
client.poll(1, time.milliseconds());
assertTrue("The client should be ready", client.isReady(node, time.milliseconds()));
- ProduceRequest.Builder builder = new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, (short) 1, 1000,
+ ProduceRequest.Builder builder = ProduceRequest.Builder.forCurrentMagic((short) 1, 1000,
Collections.<TopicPartition, MemoryRecords>emptyMap());
ClientRequest request = client.newClientRequest(node.idString(), builder, time.milliseconds(), true);
client.send(request, time.milliseconds());
@@ -134,7 +133,7 @@ public class NetworkClientTest {
private void checkSimpleRequestResponse(NetworkClient networkClient) {
awaitReady(networkClient, node); // has to be before creating any request, as it may send ApiVersionsRequest and its response is mocked with correlation id 0
- ProduceRequest.Builder builder = new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, (short) 1, 1000,
+ ProduceRequest.Builder builder = ProduceRequest.Builder.forCurrentMagic((short) 1, 1000,
Collections.<TopicPartition, MemoryRecords>emptyMap());
TestCallbackHandler handler = new TestCallbackHandler();
ClientRequest request = networkClient.newClientRequest(
@@ -179,7 +178,7 @@ public class NetworkClientTest {
@Test
public void testRequestTimeout() {
awaitReady(client, node); // has to be before creating any request, as it may send ApiVersionsRequest and its response is mocked with correlation id 0
- ProduceRequest.Builder builder = new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, (short) 1,
+ ProduceRequest.Builder builder = ProduceRequest.Builder.forCurrentMagic((short) 1,
1000, Collections.<TopicPartition, MemoryRecords>emptyMap());
TestCallbackHandler handler = new TestCallbackHandler();
long now = time.milliseconds();
http://git-wip-us.apache.org/repos/asf/kafka/blob/852297ef/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java b/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
index 6b1d92a..266d2c9 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
@@ -91,34 +91,32 @@ public class NodeApiVersionsTest {
}
@Test
- public void testUsableVersionNoDesiredVersionReturnsLatestUsable() {
- NodeApiVersions apiVersions = NodeApiVersions.create(Collections.singleton(
- new ApiVersion(ApiKeys.PRODUCE.id, (short) 1, (short) 3)));
- assertEquals(3, apiVersions.usableVersion(ApiKeys.PRODUCE, null));
- }
-
- @Test
public void testDesiredVersion() {
NodeApiVersions apiVersions = NodeApiVersions.create(Collections.singleton(
new ApiVersion(ApiKeys.PRODUCE.id, (short) 1, (short) 3)));
assertEquals(3, apiVersions.usableVersion(ApiKeys.PRODUCE));
- assertEquals(1, apiVersions.usableVersion(ApiKeys.PRODUCE, (short) 1));
- assertEquals(2, apiVersions.usableVersion(ApiKeys.PRODUCE, (short) 2));
- assertEquals(3, apiVersions.usableVersion(ApiKeys.PRODUCE, (short) 3));
+ assertEquals(1, apiVersions.usableVersion(ApiKeys.PRODUCE, (short) 0, (short) 1));
+ assertEquals(1, apiVersions.usableVersion(ApiKeys.PRODUCE, (short) 1, (short) 1));
+ assertEquals(2, apiVersions.usableVersion(ApiKeys.PRODUCE, (short) 1, (short) 2));
+ assertEquals(3, apiVersions.usableVersion(ApiKeys.PRODUCE, (short) 1, (short) 3));
+ assertEquals(2, apiVersions.usableVersion(ApiKeys.PRODUCE, (short) 2, (short) 2));
+ assertEquals(3, apiVersions.usableVersion(ApiKeys.PRODUCE, (short) 2, (short) 3));
+ assertEquals(3, apiVersions.usableVersion(ApiKeys.PRODUCE, (short) 3, (short) 3));
+ assertEquals(3, apiVersions.usableVersion(ApiKeys.PRODUCE, (short) 3, (short) 4));
}
@Test(expected = UnsupportedVersionException.class)
public void testDesiredVersionTooLarge() {
NodeApiVersions apiVersions = NodeApiVersions.create(Collections.singleton(
new ApiVersion(ApiKeys.PRODUCE.id, (short) 1, (short) 2)));
- apiVersions.usableVersion(ApiKeys.PRODUCE, (short) 3);
+ apiVersions.usableVersion(ApiKeys.PRODUCE, (short) 3, (short) 4);
}
@Test(expected = UnsupportedVersionException.class)
public void testDesiredVersionTooSmall() {
NodeApiVersions apiVersions = NodeApiVersions.create(Collections.singleton(
- new ApiVersion(ApiKeys.PRODUCE.id, (short) 1, (short) 2)));
- apiVersions.usableVersion(ApiKeys.PRODUCE, (short) 0);
+ new ApiVersion(ApiKeys.PRODUCE.id, (short) 2, (short) 3)));
+ apiVersions.usableVersion(ApiKeys.PRODUCE, (short) 0, (short) 1);
}
@Test(expected = UnsupportedVersionException.class)
http://git-wip-us.apache.org/repos/asf/kafka/blob/852297ef/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index ecf77aa..a45d9ac 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -258,7 +258,7 @@ public class SenderTest {
for (int i = 1; i <= 3; i++) {
int throttleTimeMs = 100 * i;
- ProduceRequest.Builder builder = new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, (short) 1, 1000,
+ ProduceRequest.Builder builder = ProduceRequest.Builder.forCurrentMagic((short) 1, 1000,
Collections.<TopicPartition, MemoryRecords>emptyMap());
ClientRequest request = client.newClientRequest(node.idString(), builder, time.milliseconds(), true, null);
client.send(request, time.milliseconds());
http://git-wip-us.apache.org/repos/asf/kafka/blob/852297ef/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java
index 0e8f382..ef17c96 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java
@@ -18,16 +18,25 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.InvalidRecordException;
import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
import org.junit.Test;
+import java.nio.ByteBuffer;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
public class ProduceRequestTest {
@@ -37,18 +46,10 @@ public class ProduceRequestTest {
@Test
public void shouldBeFlaggedAsTransactionalWhenTransactionalRecords() throws Exception {
- final MemoryRecords memoryRecords = MemoryRecords.withTransactionalRecords(0,
- CompressionType.NONE,
- 1L,
- (short) 1,
- 1,
- 1,
- simpleRecord);
- final ProduceRequest request = new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE,
- (short) -1,
- 10,
- Collections.singletonMap(
- new TopicPartition("topic", 1), memoryRecords)).build();
+ final MemoryRecords memoryRecords = MemoryRecords.withTransactionalRecords(0, CompressionType.NONE, 1L,
+ (short) 1, 1, 1, simpleRecord);
+ final ProduceRequest request = ProduceRequest.Builder.forCurrentMagic((short) -1,
+ 10, Collections.singletonMap(new TopicPartition("topic", 1), memoryRecords)).build();
assertTrue(request.isTransactional());
}
@@ -66,30 +67,116 @@ public class ProduceRequestTest {
@Test
public void shouldBeFlaggedAsIdempotentWhenIdempotentRecords() throws Exception {
- final MemoryRecords memoryRecords = MemoryRecords.withIdempotentRecords(1,
- CompressionType.NONE,
- 1L,
- (short) 1,
- 1,
- 1,
- simpleRecord);
-
- final ProduceRequest request = new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE,
- (short) -1,
- 10,
- Collections.singletonMap(
- new TopicPartition("topic", 1), memoryRecords)).build();
+ final MemoryRecords memoryRecords = MemoryRecords.withIdempotentRecords(1, CompressionType.NONE, 1L,
+ (short) 1, 1, 1, simpleRecord);
+ final ProduceRequest request = ProduceRequest.Builder.forCurrentMagic((short) -1, 10,
+ Collections.singletonMap(new TopicPartition("topic", 1), memoryRecords)).build();
assertTrue(request.isIdempotent());
+ }
+
+ @Test
+ public void testBuildWithOldMessageFormat() {
+ ByteBuffer buffer = ByteBuffer.allocate(256);
+ MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V1, CompressionType.NONE,
+ TimestampType.CREATE_TIME, 0L);
+ builder.append(10L, null, "a".getBytes());
+ Map<TopicPartition, MemoryRecords> produceData = new HashMap<>();
+ produceData.put(new TopicPartition("test", 0), builder.build());
+
+ ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(RecordBatch.MAGIC_VALUE_V1, (short) 1,
+ 5000, produceData, null);
+ assertEquals(2, requestBuilder.oldestAllowedVersion());
+ assertEquals(2, requestBuilder.latestAllowedVersion());
+ }
+
+ @Test
+ public void testBuildWithCurrentMessageFormat() {
+ ByteBuffer buffer = ByteBuffer.allocate(256);
+ MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE,
+ CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
+ builder.append(10L, null, "a".getBytes());
+ Map<TopicPartition, MemoryRecords> produceData = new HashMap<>();
+ produceData.put(new TopicPartition("test", 0), builder.build());
+
+ ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(RecordBatch.CURRENT_MAGIC_VALUE,
+ (short) 1, 5000, produceData, null);
+ assertEquals(3, requestBuilder.oldestAllowedVersion());
+ assertEquals(ApiKeys.PRODUCE.latestVersion(), requestBuilder.latestAllowedVersion());
+ }
+
+ @Test
+ public void testV3AndAboveShouldContainOnlyOneRecordBatch() {
+ ByteBuffer buffer = ByteBuffer.allocate(256);
+ MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
+ builder.append(10L, null, "a".getBytes());
+ builder.close();
+
+ builder = MemoryRecords.builder(buffer, CompressionType.NONE, TimestampType.CREATE_TIME, 1L);
+ builder.append(11L, "1".getBytes(), "b".getBytes());
+ builder.append(12L, null, "c".getBytes());
+ builder.close();
+
+ buffer.flip();
+
+ Map<TopicPartition, MemoryRecords> produceData = new HashMap<>();
+ produceData.put(new TopicPartition("test", 0), MemoryRecords.readableRecords(buffer));
+ ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forCurrentMagic((short) 1, 5000, produceData);
+ assertThrowsInvalidRecordExceptionForAllVersions(requestBuilder);
+ }
+
+ @Test
+ public void testV3AndAboveCannotHaveNoRecordBatches() {
+ Map<TopicPartition, MemoryRecords> produceData = new HashMap<>();
+ produceData.put(new TopicPartition("test", 0), MemoryRecords.EMPTY);
+ ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forCurrentMagic((short) 1, 5000, produceData);
+ assertThrowsInvalidRecordExceptionForAllVersions(requestBuilder);
+ }
+
+ @Test
+ public void testV3AndAboveCannotUseMagicV0() {
+ ByteBuffer buffer = ByteBuffer.allocate(256);
+ MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V0, CompressionType.NONE,
+ TimestampType.NO_TIMESTAMP_TYPE, 0L);
+ builder.append(10L, null, "a".getBytes());
+
+ Map<TopicPartition, MemoryRecords> produceData = new HashMap<>();
+ produceData.put(new TopicPartition("test", 0), builder.build());
+ ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forCurrentMagic((short) 1, 5000, produceData);
+ assertThrowsInvalidRecordExceptionForAllVersions(requestBuilder);
+ }
+
+ @Test
+ public void testV3AndAboveCannotUseMagicV1() {
+ ByteBuffer buffer = ByteBuffer.allocate(256);
+ MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V1, CompressionType.NONE,
+ TimestampType.CREATE_TIME, 0L);
+ builder.append(10L, null, "a".getBytes());
+
+ Map<TopicPartition, MemoryRecords> produceData = new HashMap<>();
+ produceData.put(new TopicPartition("test", 0), builder.build());
+ ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forCurrentMagic((short) 1, 5000, produceData);
+ assertThrowsInvalidRecordExceptionForAllVersions(requestBuilder);
+ }
+
+ private void assertThrowsInvalidRecordExceptionForAllVersions(ProduceRequest.Builder builder) {
+ for (short version = builder.oldestAllowedVersion(); version < builder.latestAllowedVersion(); version++) {
+ assertThrowsInvalidRecordException(builder, version);
+ }
+ }
+ private void assertThrowsInvalidRecordException(ProduceRequest.Builder builder, short version) {
+ try {
+ builder.build(version).toStruct();
+ fail("Builder did not raise " + InvalidRecordException.class.getName() + " as expected");
+ } catch (RuntimeException e) {
+ assertTrue("Unexpected exception type " + e.getClass().getName(),
+ InvalidRecordException.class.isAssignableFrom(e.getClass()));
+ }
}
private ProduceRequest createNonIdempotentNonTransactionalRecords() {
- final MemoryRecords memoryRecords = MemoryRecords.withRecords(CompressionType.NONE,
- simpleRecord);
- return new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE,
- (short) -1,
- 10,
- Collections.singletonMap(
- new TopicPartition("topic", 1), memoryRecords)).build();
+ final MemoryRecords memoryRecords = MemoryRecords.withRecords(CompressionType.NONE, simpleRecord);
+ return ProduceRequest.Builder.forCurrentMagic((short) -1, 10,
+ Collections.singletonMap(new TopicPartition("topic", 1), memoryRecords)).build();
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/852297ef/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index a76cc84..e96b188 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -17,14 +17,14 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.clients.admin.NewPartitions;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AccessControlEntryFilter;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.NotCoordinatorException;
@@ -39,12 +39,9 @@ import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.record.CompressionType;
-import org.apache.kafka.common.record.InvalidRecordException;
import org.apache.kafka.common.record.MemoryRecords;
-import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.SimpleRecord;
-import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation;
import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse;
import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult;
@@ -397,56 +394,6 @@ public class RequestResponseTest {
assertEquals("Response data does not match", responseData, v2Response.responses());
}
- @Test(expected = InvalidRecordException.class)
- public void produceRequestV3ShouldContainOnlyOneRecordBatch() {
- ByteBuffer buffer = ByteBuffer.allocate(256);
- MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
- builder.append(10L, null, "a".getBytes());
- builder.close();
-
- builder = MemoryRecords.builder(buffer, CompressionType.NONE, TimestampType.CREATE_TIME, 1L);
- builder.append(11L, "1".getBytes(), "b".getBytes());
- builder.append(12L, null, "c".getBytes());
- builder.close();
-
- buffer.flip();
-
- Map<TopicPartition, MemoryRecords> produceData = new HashMap<>();
- produceData.put(new TopicPartition("test", 0), MemoryRecords.readableRecords(buffer));
- new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, (short) 1, 5000, produceData).build().toStruct();
- }
-
- @Test(expected = InvalidRecordException.class)
- public void produceRequestV3CannotHaveNoRecordBatches() {
- Map<TopicPartition, MemoryRecords> produceData = new HashMap<>();
- produceData.put(new TopicPartition("test", 0), MemoryRecords.EMPTY);
- new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, (short) 1, 5000, produceData).build().toStruct();
- }
-
- @Test(expected = InvalidRecordException.class)
- public void produceRequestV3CannotUseMagicV0() {
- ByteBuffer buffer = ByteBuffer.allocate(256);
- MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V0, CompressionType.NONE,
- TimestampType.NO_TIMESTAMP_TYPE, 0L);
- builder.append(10L, null, "a".getBytes());
-
- Map<TopicPartition, MemoryRecords> produceData = new HashMap<>();
- produceData.put(new TopicPartition("test", 0), builder.build());
- new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, (short) 1, 5000, produceData).build().toStruct();
- }
-
- @Test(expected = InvalidRecordException.class)
- public void produceRequestV3CannotUseMagicV1() {
- ByteBuffer buffer = ByteBuffer.allocate(256);
- MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V1, CompressionType.NONE,
- TimestampType.CREATE_TIME, 0L);
- builder.append(10L, null, "a".getBytes());
-
- Map<TopicPartition, MemoryRecords> produceData = new HashMap<>();
- produceData.put(new TopicPartition("test", 0), builder.build());
- new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, (short) 1, 5000, produceData).build().toStruct();
- }
-
@Test
public void fetchResponseVersionTest() {
LinkedHashMap<TopicPartition, FetchResponse.PartitionData> responseData = new LinkedHashMap<>();
@@ -774,7 +721,8 @@ public class RequestResponseTest {
byte magic = version == 2 ? RecordBatch.MAGIC_VALUE_V1 : RecordBatch.MAGIC_VALUE_V2;
MemoryRecords records = MemoryRecords.withRecords(magic, CompressionType.NONE, new SimpleRecord("woot".getBytes()));
Map<TopicPartition, MemoryRecords> produceData = Collections.singletonMap(new TopicPartition("test", 0), records);
- return new ProduceRequest.Builder(magic, (short) 1, 5000, produceData).build((short) version);
+ return ProduceRequest.Builder.forMagic(magic, (short) 1, 5000, produceData, "transactionalId")
+ .build((short) version);
}
private ProduceResponse createProduceResponse() {
@@ -796,11 +744,11 @@ public class RequestResponseTest {
}
private ControlledShutdownRequest createControlledShutdownRequest() {
- return new ControlledShutdownRequest.Builder(10).build();
+ return new ControlledShutdownRequest.Builder(10, ApiKeys.CONTROLLED_SHUTDOWN.latestVersion()).build();
}
private ControlledShutdownRequest createControlledShutdownRequest(int version) {
- return new ControlledShutdownRequest.Builder(10).build((short) version);
+ return new ControlledShutdownRequest.Builder(10, ApiKeys.CONTROLLED_SHUTDOWN.latestVersion()).build((short) version);
}
private ControlledShutdownResponse createControlledShutdownResponse() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/852297ef/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/InterBrokerSendThread.scala b/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
index 06158b2..70dae35 100644
--- a/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
+++ b/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
@@ -59,15 +59,16 @@ abstract class InterBrokerSendThread(name: String,
if (networkClient.ready(request.destination, now)) {
networkClient.send(clientRequest, now)
} else {
- val disConnectedResponse: ClientResponse = new ClientResponse(clientRequest.makeHeader(request.request.desiredOrLatestVersion()),
- completionHandler, destination,
- now /* createdTimeMs */ , now /* receivedTimeMs */ , true /* disconnected */ , null /* versionMismatch */ , null /* responseBody */)
+ val header = clientRequest.makeHeader(request.request.latestAllowedVersion)
+ val disconnectResponse: ClientResponse = new ClientResponse(header, completionHandler, destination,
+ now /* createdTimeMs */ , now /* receivedTimeMs */ , true /* disconnected */ , null /* versionMismatch */ ,
+ null /* responseBody */)
// poll timeout would be the minimum of connection delay if there are any dest yet to be reached;
// otherwise it is infinity
pollTimeout = Math.min(pollTimeout, networkClient.connectionDelay(request.destination, now))
- completionHandler.onComplete(disConnectedResponse)
+ completionHandler.onComplete(disconnectResponse)
}
}
networkClient.poll(pollTimeout, now)
http://git-wip-us.apache.org/repos/asf/kafka/blob/852297ef/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 7aeffb5..02c40b1 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -256,7 +256,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
}
private def createProduceRequest = {
- new requests.ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, 1, 5000,
+ requests.ProduceRequest.Builder.forCurrentMagic(1, 5000,
collection.mutable.Map(tp -> MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("test".getBytes))).asJava).
build()
}
@@ -330,7 +330,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
private def stopReplicaRequest = new StopReplicaRequest.Builder(brokerId, Int.MaxValue, true, Set(tp).asJava).build()
- private def controlledShutdownRequest = new requests.ControlledShutdownRequest.Builder(brokerId).build()
+ private def controlledShutdownRequest = new requests.ControlledShutdownRequest.Builder(brokerId,
+ ApiKeys.CONTROLLED_SHUTDOWN.latestVersion).build()
private def createTopicsRequest =
new CreateTopicsRequest.Builder(Map(createTopic -> new TopicDetails(1, 1.toShort)).asJava, 0).build()
http://git-wip-us.apache.org/repos/asf/kafka/blob/852297ef/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index d623374..8b611f2 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -147,7 +147,7 @@ class SocketServerTest extends JUnitSuite {
val ackTimeoutMs = 10000
val ack = 0: Short
- val emptyRequest = new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, ack, ackTimeoutMs,
+ val emptyRequest = ProduceRequest.Builder.forCurrentMagic(ack, ackTimeoutMs,
new HashMap[TopicPartition, MemoryRecords]()).build()
val emptyHeader = new RequestHeader(ApiKeys.PRODUCE, emptyRequest.version, clientId, correlationId)
val byteBuffer = emptyRequest.serialize(emptyHeader)
@@ -465,7 +465,7 @@ class SocketServerTest extends JUnitSuite {
val clientId = ""
val ackTimeoutMs = 10000
val ack = 0: Short
- val emptyRequest = new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, ack, ackTimeoutMs,
+ val emptyRequest = ProduceRequest.Builder.forCurrentMagic(ack, ackTimeoutMs,
new HashMap[TopicPartition, MemoryRecords]()).build()
val emptyHeader = new RequestHeader(ApiKeys.PRODUCE, emptyRequest.version, clientId, correlationId)
http://git-wip-us.apache.org/repos/asf/kafka/blob/852297ef/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
index b9a4dfe..092dffe 100755
--- a/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
@@ -119,8 +119,7 @@ class EdgeCaseRequestTest extends KafkaServerTestHarness {
val headerBytes = requestHeaderBytes(ApiKeys.PRODUCE.id, version, null,
correlationId)
val records = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("message".getBytes))
- val request = new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, 1, 10000,
- Map(topicPartition -> records).asJava).build()
+ val request = ProduceRequest.Builder.forCurrentMagic(1, 10000, Map(topicPartition -> records).asJava).build()
val byteBuffer = ByteBuffer.allocate(headerBytes.length + request.toStruct.sizeOf)
byteBuffer.put(headerBytes)
request.toStruct.writeTo(byteBuffer)
http://git-wip-us.apache.org/repos/asf/kafka/blob/852297ef/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
index 189f57c..029bb33 100644
--- a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
@@ -41,7 +41,7 @@ class ProduceRequestTest extends BaseRequestTest {
val topicPartition = new TopicPartition("topic", partition)
val partitionRecords = Map(topicPartition -> memoryRecords)
val produceResponse = sendProduceRequest(leader,
- new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, -1, 3000, partitionRecords.asJava).build())
+ ProduceRequest.Builder.forCurrentMagic(-1, 3000, partitionRecords.asJava).build())
assertEquals(1, produceResponse.responses.size)
val (tp, partitionResponse) = produceResponse.responses.asScala.head
assertEquals(topicPartition, tp)
@@ -79,7 +79,7 @@ class ProduceRequestTest extends BaseRequestTest {
val topicPartition = new TopicPartition("topic", partition)
val partitionRecords = Map(topicPartition -> memoryRecords)
val produceResponse = sendProduceRequest(leader,
- new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, -1, 3000, partitionRecords.asJava).build())
+ ProduceRequest.Builder.forCurrentMagic(-1, 3000, partitionRecords.asJava).build())
assertEquals(1, produceResponse.responses.size)
val (tp, partitionResponse) = produceResponse.responses.asScala.head
assertEquals(topicPartition, tp)
http://git-wip-us.apache.org/repos/asf/kafka/blob/852297ef/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index c244bd7..e15ea4b 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -165,7 +165,7 @@ class RequestQuotaTest extends BaseRequestTest {
private def requestBuilder(apiKey: ApiKeys): AbstractRequest.Builder[_ <: AbstractRequest] = {
apiKey match {
case ApiKeys.PRODUCE =>
- new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, 1, 5000,
+ ProduceRequest.Builder.forCurrentMagic(1, 5000,
collection.mutable.Map(tp -> MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("test".getBytes))).asJava)
case ApiKeys.FETCH =>
@@ -198,7 +198,7 @@ class RequestQuotaTest extends BaseRequestTest {
new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion, brokerId, Int.MaxValue, partitionState, brokers)
case ApiKeys.CONTROLLED_SHUTDOWN =>
- new ControlledShutdownRequest.Builder(brokerId)
+ new ControlledShutdownRequest.Builder(brokerId, ApiKeys.CONTROLLED_SHUTDOWN.latestVersion)
case ApiKeys.OFFSET_COMMIT =>
new OffsetCommitRequest.Builder("test-group",