You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/05/03 00:19:46 UTC
kafka git commit: MINOR: Serialize the real isolationLevel in
FetchRequest
Repository: kafka
Updated Branches:
refs/heads/trunk 619fd7aeb -> 0e7cc4aa3
MINOR: Serialize the real isolationLevel in FetchRequest
Author: Apurva Mehta <ap...@confluent.io>
Reviewers: Ismael Juma <is...@juma.me.uk>
Closes #2961 from apurvam/MINOR-serialize-isolation-level-in-fetch-request
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0e7cc4aa
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0e7cc4aa
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0e7cc4aa
Branch: refs/heads/trunk
Commit: 0e7cc4aa384372d1e08d7bd444b5e23510f447d9
Parents: 619fd7a
Author: Apurva Mehta <ap...@confluent.io>
Authored: Wed May 3 01:19:35 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Wed May 3 01:19:35 2017 +0100
----------------------------------------------------------------------
.../kafka/common/requests/FetchRequest.java | 4 ++--
.../clients/consumer/internals/FetcherTest.java | 9 ++++++++-
.../common/requests/RequestResponseTest.java | 20 ++++++++++++++++++++
3 files changed, 30 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7cc4aa/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index 4c6998b..fc7d53c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -145,7 +145,7 @@ public class FetchRequest extends AbstractRequest {
maxBytes = DEFAULT_RESPONSE_MAX_BYTES;
}
- return new FetchRequest(version, replicaId, maxWait, minBytes, maxBytes, fetchData, IsolationLevel.READ_UNCOMMITTED);
+ return new FetchRequest(version, replicaId, maxWait, minBytes, maxBytes, fetchData, isolationLevel);
}
@Override
@@ -266,7 +266,7 @@ public class FetchRequest extends AbstractRequest {
if (struct.hasField(MAX_BYTES_KEY_NAME))
struct.set(MAX_BYTES_KEY_NAME, maxBytes);
if (struct.hasField(ISOLATION_LEVEL_KEY_NAME))
- struct.set(ISOLATION_LEVEL_KEY_NAME, IsolationLevel.READ_UNCOMMITTED.id());
+ struct.set(ISOLATION_LEVEL_KEY_NAME, isolationLevel.id());
List<Struct> topicArray = new ArrayList<>();
for (TopicAndPartitionData<PartitionData> topicEntry : topicsData) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7cc4aa/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index f0dd09c..e38e583 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -1252,8 +1252,15 @@ public class FetcherTest {
// normal fetch
assertEquals(1, fetcher.sendFetches());
assertFalse(fetcher.hasCompletedFetches());
+ client.prepareResponse(new MockClient.RequestMatcher() {
+ @Override
+ public boolean matches(AbstractRequest body) {
+ FetchRequest request = (FetchRequest) body;
+ assertEquals(IsolationLevel.READ_COMMITTED, request.isolationLevel());
+ return true;
+ }
+ }, fetchResponseWithAbortedTransactions(records, abortedTransactions, Errors.NONE, 100L, 100L, 0));
- client.prepareResponse(fetchResponseWithAbortedTransactions(records, abortedTransactions, Errors.NONE, 100L, 100L, 0));
consumerClient.poll(0);
assertTrue(fetcher.hasCompletedFetches());
http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7cc4aa/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 7c53b54..c948fd1 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -495,6 +495,19 @@ public class RequestResponseTest {
FetchRequest fr2 = new FetchRequest(fr.toStruct(), version);
assertEquals(fr2.maxBytes(), fr.maxBytes());
}
+
+ @Test
+ public void testFetchRequestIsolationLevel() throws Exception {
+ FetchRequest request = createFetchRequest(4, IsolationLevel.READ_COMMITTED);
+ Struct struct = request.toStruct();
+ FetchRequest deserialized = (FetchRequest) deserialize(request, struct, request.version());
+ assertEquals(request.isolationLevel(), deserialized.isolationLevel());
+
+ request = createFetchRequest(4, IsolationLevel.READ_UNCOMMITTED);
+ struct = request.toStruct();
+ deserialized = (FetchRequest) deserialize(request, struct, request.version());
+ assertEquals(request.isolationLevel(), deserialized.isolationLevel());
+ }
private RequestHeader createRequestHeader() {
return new RequestHeader((short) 10, (short) 1, "", 10);
@@ -513,6 +526,13 @@ public class RequestResponseTest {
return new FindCoordinatorResponse(Errors.NONE, new Node(10, "host1", 2014));
}
+ private FetchRequest createFetchRequest(int version, IsolationLevel isolationLevel) {
+ LinkedHashMap<TopicPartition, FetchRequest.PartitionData> fetchData = new LinkedHashMap<>();
+ fetchData.put(new TopicPartition("test1", 0), new FetchRequest.PartitionData(100, 0L, 1000000));
+ fetchData.put(new TopicPartition("test2", 0), new FetchRequest.PartitionData(200, 0L, 1000000));
+ return FetchRequest.Builder.forConsumer(100, 100000, fetchData, isolationLevel).setMaxBytes(1000).build((short) version);
+ }
+
private FetchRequest createFetchRequest(int version) {
LinkedHashMap<TopicPartition, FetchRequest.PartitionData> fetchData = new LinkedHashMap<>();
fetchData.put(new TopicPartition("test1", 0), new FetchRequest.PartitionData(100, 0L, 1000000));