You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/05/03 00:19:46 UTC

kafka git commit: MINOR: Serialize the real isolationLevel in FetchRequest

Repository: kafka
Updated Branches:
  refs/heads/trunk 619fd7aeb -> 0e7cc4aa3


MINOR: Serialize the real isolationLevel in FetchRequest

Author: Apurva Mehta <ap...@confluent.io>

Reviewers: Ismael Juma <is...@juma.me.uk>

Closes #2961 from apurvam/MINOR-serialize-isolation-level-in-fetch-request


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0e7cc4aa
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0e7cc4aa
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0e7cc4aa

Branch: refs/heads/trunk
Commit: 0e7cc4aa384372d1e08d7bd444b5e23510f447d9
Parents: 619fd7a
Author: Apurva Mehta <ap...@confluent.io>
Authored: Wed May 3 01:19:35 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Wed May 3 01:19:35 2017 +0100

----------------------------------------------------------------------
 .../kafka/common/requests/FetchRequest.java     |  4 ++--
 .../clients/consumer/internals/FetcherTest.java |  9 ++++++++-
 .../common/requests/RequestResponseTest.java    | 20 ++++++++++++++++++++
 3 files changed, 30 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7cc4aa/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index 4c6998b..fc7d53c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -145,7 +145,7 @@ public class FetchRequest extends AbstractRequest {
                 maxBytes = DEFAULT_RESPONSE_MAX_BYTES;
             }
 
-            return new FetchRequest(version, replicaId, maxWait, minBytes, maxBytes, fetchData, IsolationLevel.READ_UNCOMMITTED);
+            return new FetchRequest(version, replicaId, maxWait, minBytes, maxBytes, fetchData, isolationLevel);
         }
 
         @Override
@@ -266,7 +266,7 @@ public class FetchRequest extends AbstractRequest {
         if (struct.hasField(MAX_BYTES_KEY_NAME))
             struct.set(MAX_BYTES_KEY_NAME, maxBytes);
         if (struct.hasField(ISOLATION_LEVEL_KEY_NAME))
-            struct.set(ISOLATION_LEVEL_KEY_NAME, IsolationLevel.READ_UNCOMMITTED.id());
+            struct.set(ISOLATION_LEVEL_KEY_NAME, isolationLevel.id());
 
         List<Struct> topicArray = new ArrayList<>();
         for (TopicAndPartitionData<PartitionData> topicEntry : topicsData) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7cc4aa/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index f0dd09c..e38e583 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -1252,8 +1252,15 @@ public class FetcherTest {
         // normal fetch
         assertEquals(1, fetcher.sendFetches());
         assertFalse(fetcher.hasCompletedFetches());
+        client.prepareResponse(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(AbstractRequest body) {
+                FetchRequest request = (FetchRequest) body;
+                assertEquals(IsolationLevel.READ_COMMITTED, request.isolationLevel());
+                return true;
+            }
+        }, fetchResponseWithAbortedTransactions(records, abortedTransactions, Errors.NONE, 100L, 100L, 0));
 
-        client.prepareResponse(fetchResponseWithAbortedTransactions(records, abortedTransactions, Errors.NONE, 100L, 100L, 0));
         consumerClient.poll(0);
         assertTrue(fetcher.hasCompletedFetches());
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7cc4aa/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 7c53b54..c948fd1 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -495,6 +495,19 @@ public class RequestResponseTest {
         FetchRequest fr2 = new FetchRequest(fr.toStruct(), version);
         assertEquals(fr2.maxBytes(), fr.maxBytes());
     }
+
+    @Test
+    public void testFetchRequestIsolationLevel() throws Exception {
+        FetchRequest request = createFetchRequest(4, IsolationLevel.READ_COMMITTED);
+        Struct struct = request.toStruct();
+        FetchRequest deserialized = (FetchRequest) deserialize(request, struct, request.version());
+        assertEquals(request.isolationLevel(), deserialized.isolationLevel());
+
+        request = createFetchRequest(4, IsolationLevel.READ_UNCOMMITTED);
+        struct = request.toStruct();
+        deserialized = (FetchRequest) deserialize(request, struct, request.version());
+        assertEquals(request.isolationLevel(), deserialized.isolationLevel());
+    }
     
     private RequestHeader createRequestHeader() {
         return new RequestHeader((short) 10, (short) 1, "", 10);
@@ -513,6 +526,13 @@ public class RequestResponseTest {
         return new FindCoordinatorResponse(Errors.NONE, new Node(10, "host1", 2014));
     }
 
+    private FetchRequest createFetchRequest(int version, IsolationLevel isolationLevel) {
+        LinkedHashMap<TopicPartition, FetchRequest.PartitionData> fetchData = new LinkedHashMap<>();
+        fetchData.put(new TopicPartition("test1", 0), new FetchRequest.PartitionData(100, 0L, 1000000));
+        fetchData.put(new TopicPartition("test2", 0), new FetchRequest.PartitionData(200, 0L, 1000000));
+        return FetchRequest.Builder.forConsumer(100, 100000, fetchData, isolationLevel).setMaxBytes(1000).build((short) version);
+    }
+
     private FetchRequest createFetchRequest(int version) {
         LinkedHashMap<TopicPartition, FetchRequest.PartitionData> fetchData = new LinkedHashMap<>();
         fetchData.put(new TopicPartition("test1", 0), new FetchRequest.PartitionData(100, 0L, 1000000));