You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/03/25 21:08:02 UTC

kafka git commit: MINOR: Fix deserialization of abortedTransactions and lastStableOffset in FetchResponse

Repository: kafka
Updated Branches:
  refs/heads/trunk 462767660 -> d348ac92c


MINOR: Fix deserialization of abortedTransactions and lastStableOffset in FetchResponse

Thanks to Dong Lin for finding the lastStableOffset issue.

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Dong Lin <li...@gmail.com>, Jason Gustafson <ja...@confluent.io>

Closes #2737 from ijuma/fix-fetch-response-lso


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d348ac92
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d348ac92
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d348ac92

Branch: refs/heads/trunk
Commit: d348ac92c80b6426864e3ed68b89bad341b3bda5
Parents: 4627676
Author: Ismael Juma <is...@juma.me.uk>
Authored: Sat Mar 25 14:03:40 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Sat Mar 25 14:03:40 2017 -0700

----------------------------------------------------------------------
 .../kafka/common/requests/FetchResponse.java    | 56 ++++++++++++--
 .../common/requests/RequestResponseTest.java    | 78 +++++++++++++-------
 2 files changed, 100 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d348ac92/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index 617ebe0..f0a0eee 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -29,7 +29,6 @@ import org.apache.kafka.common.record.Records;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -86,6 +85,25 @@ public class FetchResponse extends AbstractResponse {
         }
 
         @Override
+        public boolean equals(Object o) {
+            if (this == o)
+                return true;
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            AbortedTransaction that = (AbortedTransaction) o;
+
+            return pid == that.pid && firstOffset == that.firstOffset;
+        }
+
+        @Override
+        public int hashCode() {
+            int result = (int) (pid ^ (pid >>> 32));
+            result = 31 * result + (int) (firstOffset ^ (firstOffset >>> 32));
+            return result;
+        }
+
+        @Override
         public String toString() {
             return "(pid=" + pid + ", firstOffset=" + firstOffset + ")";
         }
@@ -111,6 +129,32 @@ public class FetchResponse extends AbstractResponse {
         }
 
         @Override
+        public boolean equals(Object o) {
+            if (this == o)
+                return true;
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            PartitionData that = (PartitionData) o;
+
+            return error == that.error &&
+                    highWatermark == that.highWatermark &&
+                    lastStableOffset == that.lastStableOffset &&
+                    (abortedTransactions == null ? that.abortedTransactions == null : abortedTransactions.equals(that.abortedTransactions)) &&
+                    (records == null ? that.records == null : records.equals(that.records));
+        }
+
+        @Override
+        public int hashCode() {
+            int result = error != null ? error.hashCode() : 0;
+            result = 31 * result + (int) (lastStableOffset ^ (lastStableOffset >>> 32));
+            result = 31 * result + (int) (highWatermark ^ (highWatermark >>> 32));
+            result = 31 * result + (abortedTransactions != null ? abortedTransactions.hashCode() : 0);
+            result = 31 * result + (records != null ? records.hashCode() : 0);
+            return result;
+        }
+
+        @Override
         public String toString() {
             return "(error=" + error + ", highWaterMark=" + highWatermark +
                     ", lastStableOffset = " + lastStableOffset + ", " +
@@ -144,14 +188,14 @@ public class FetchResponse extends AbstractResponse {
                 Errors error = Errors.forCode(partitionResponseHeader.getShort(ERROR_CODE_KEY_NAME));
                 long highWatermark = partitionResponseHeader.getLong(HIGH_WATERMARK_KEY_NAME);
                 long lastStableOffset = INVALID_LSO;
-                if (partitionResponse.hasField(LAST_STABLE_OFFSET_KEY_NAME))
-                    lastStableOffset = partitionResponse.getLong(LAST_STABLE_OFFSET_KEY_NAME);
+                if (partitionResponseHeader.hasField(LAST_STABLE_OFFSET_KEY_NAME))
+                    lastStableOffset = partitionResponseHeader.getLong(LAST_STABLE_OFFSET_KEY_NAME);
 
                 Records records = partitionResponse.getRecords(RECORD_SET_KEY_NAME);
 
-                List<AbortedTransaction> abortedTransactions = Collections.emptyList();
-                if (partitionResponse.hasField(ABORTED_TRANSACTIONS_KEY_NAME)) {
-                    Object[] abortedTransactionsArray = partitionResponse.getArray(ABORTED_TRANSACTIONS_KEY_NAME);
+                List<AbortedTransaction> abortedTransactions = null;
+                if (partitionResponseHeader.hasField(ABORTED_TRANSACTIONS_KEY_NAME)) {
+                    Object[] abortedTransactionsArray = partitionResponseHeader.getArray(ABORTED_TRANSACTIONS_KEY_NAME);
                     if (abortedTransactionsArray != null) {
                         abortedTransactions = new ArrayList<>(abortedTransactionsArray.length);
                         for (Object abortedTransactionObj : abortedTransactionsArray) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d348ac92/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index b9fbf06..8a6d69a 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -35,6 +35,7 @@ import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.MemoryRecordsBuilder;
 import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.utils.Utils;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -43,7 +44,6 @@ import java.lang.reflect.Method;
 import java.nio.ByteBuffer;
 import java.nio.channels.GatheringByteChannel;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -52,6 +52,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static java.util.Arrays.asList;
 import static java.util.Collections.singletonList;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -91,10 +92,10 @@ public class RequestResponseTest {
         checkErrorResponse(createListOffsetRequest(1), new UnknownServerException());
         checkResponse(createListOffsetResponse(1), 1);
         checkRequest(MetadataRequest.Builder.allTopics().build((short) 2));
-        checkRequest(createMetadataRequest(1, Arrays.asList("topic1")));
-        checkErrorResponse(createMetadataRequest(1, Arrays.asList("topic1")), new UnknownServerException());
+        checkRequest(createMetadataRequest(1, asList("topic1")));
+        checkErrorResponse(createMetadataRequest(1, asList("topic1")), new UnknownServerException());
         checkResponse(createMetadataResponse(), 2);
-        checkErrorResponse(createMetadataRequest(2, Arrays.asList("topic1")), new UnknownServerException());
+        checkErrorResponse(createMetadataRequest(2, asList("topic1")), new UnknownServerException());
         checkRequest(createOffsetCommitRequest(2));
         checkErrorResponse(createOffsetCommitRequest(2), new UnknownServerException());
         checkResponse(createOffsetCommitResponse(), 0);
@@ -138,7 +139,7 @@ public class RequestResponseTest {
         checkOlderFetchVersions();
         checkResponse(createMetadataResponse(), 0);
         checkResponse(createMetadataResponse(), 1);
-        checkErrorResponse(createMetadataRequest(1, Arrays.asList("topic1")), new UnknownServerException());
+        checkErrorResponse(createMetadataRequest(1, asList("topic1")), new UnknownServerException());
         checkRequest(createOffsetCommitRequest(0));
         checkErrorResponse(createOffsetCommitRequest(0), new UnknownServerException());
         checkRequest(createOffsetCommitRequest(1));
@@ -362,6 +363,28 @@ public class RequestResponseTest {
     }
 
     @Test
+    public void testFetchResponseV4() {
+        LinkedHashMap<TopicPartition, FetchResponse.PartitionData> responseData = new LinkedHashMap<>();
+
+        MemoryRecords records = MemoryRecords.readableRecords(ByteBuffer.allocate(10));
+
+        List<FetchResponse.AbortedTransaction> abortedTransactions = asList(
+                new FetchResponse.AbortedTransaction(10, 100),
+                new FetchResponse.AbortedTransaction(15, 50)
+        );
+        responseData.put(new TopicPartition("bar", 0), new FetchResponse.PartitionData(Errors.NONE, 100000,
+                FetchResponse.INVALID_LSO, abortedTransactions, records));
+        responseData.put(new TopicPartition("bar", 1), new FetchResponse.PartitionData(Errors.NONE, 900000,
+                5, null, records));
+        responseData.put(new TopicPartition("foo", 0), new FetchResponse.PartitionData(Errors.NONE, 70000,
+                6, Collections.<FetchResponse.AbortedTransaction>emptyList(), records));
+
+        FetchResponse response = new FetchResponse(responseData, 10);
+        FetchResponse deserialized = FetchResponse.parse(toBuffer(response.toStruct((short) 4)), (short) 4);
+        assertEquals(responseData, deserialized.responseData());
+    }
+
+    @Test
     public void verifyFetchResponseFullWrite() throws Exception {
         FetchResponse fetchResponse = createFetchResponse();
         RequestHeader header = new RequestHeader(ApiKeys.FETCH.id, ApiKeys.FETCH.latestVersion(),
@@ -488,7 +511,7 @@ public class RequestResponseTest {
     }
 
     private ListGroupsResponse createListGroupsResponse() {
-        List<ListGroupsResponse.Group> groups = Arrays.asList(new ListGroupsResponse.Group("test-group", "consumer"));
+        List<ListGroupsResponse.Group> groups = asList(new ListGroupsResponse.Group("test-group", "consumer"));
         return new ListGroupsResponse(Errors.NONE, groups);
     }
 
@@ -503,7 +526,7 @@ public class RequestResponseTest {
         DescribeGroupsResponse.GroupMember member = new DescribeGroupsResponse.GroupMember("memberId",
                 clientId, clientHost, empty, empty);
         DescribeGroupsResponse.GroupMetadata metadata = new DescribeGroupsResponse.GroupMetadata(Errors.NONE,
-                "STABLE", "consumer", "roundrobin", Arrays.asList(member));
+                "STABLE", "consumer", "roundrobin", asList(member));
         return new DescribeGroupsResponse(Collections.singletonMap("test-group", metadata));
     }
 
@@ -536,7 +559,7 @@ public class RequestResponseTest {
         if (version == 0) {
             Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = new HashMap<>();
             responseData.put(new TopicPartition("test", 0),
-                    new ListOffsetResponse.PartitionData(Errors.NONE, Arrays.asList(100L)));
+                    new ListOffsetResponse.PartitionData(Errors.NONE, asList(100L)));
             return new ListOffsetResponse(responseData);
         } else if (version == 1) {
             Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = new HashMap<>();
@@ -554,16 +577,16 @@ public class RequestResponseTest {
 
     private MetadataResponse createMetadataResponse() {
         Node node = new Node(1, "host1", 1001);
-        List<Node> replicas = Arrays.asList(node);
-        List<Node> isr = Arrays.asList(node);
+        List<Node> replicas = asList(node);
+        List<Node> isr = asList(node);
 
         List<MetadataResponse.TopicMetadata> allTopicMetadata = new ArrayList<>();
         allTopicMetadata.add(new MetadataResponse.TopicMetadata(Errors.NONE, "__consumer_offsets", true,
-                Arrays.asList(new MetadataResponse.PartitionMetadata(Errors.NONE, 1, node, replicas, isr))));
+                asList(new MetadataResponse.PartitionMetadata(Errors.NONE, 1, node, replicas, isr))));
         allTopicMetadata.add(new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, "topic2", false,
                 Collections.<MetadataResponse.PartitionMetadata>emptyList()));
 
-        return new MetadataResponse(Arrays.asList(node), null, MetadataResponse.NO_CONTROLLER_ID, allTopicMetadata);
+        return new MetadataResponse(asList(node), null, MetadataResponse.NO_CONTROLLER_ID, allTopicMetadata);
     }
 
     private OffsetCommitRequest createOffsetCommitRequest(int version) {
@@ -613,7 +636,7 @@ public class RequestResponseTest {
     }
 
     private StopReplicaRequest createStopReplicaRequest(boolean deletePartitions) {
-        Set<TopicPartition> partitions = new HashSet<>(Arrays.asList(new TopicPartition("test", 0)));
+        Set<TopicPartition> partitions = Utils.mkSet(new TopicPartition("test", 0));
         return new StopReplicaRequest.Builder(0, 1, deletePartitions, partitions).build();
     }
 
@@ -628,17 +651,17 @@ public class RequestResponseTest {
     }
 
     private ControlledShutdownResponse createControlledShutdownResponse() {
-        HashSet<TopicPartition> topicPartitions = new HashSet<>(Arrays.asList(
+        Set<TopicPartition> topicPartitions = Utils.mkSet(
                 new TopicPartition("test2", 5),
                 new TopicPartition("test1", 10)
-        ));
+        );
         return new ControlledShutdownResponse(Errors.NONE, topicPartitions);
     }
 
     private LeaderAndIsrRequest createLeaderAndIsrRequest() {
         Map<TopicPartition, PartitionState> partitionStates = new HashMap<>();
-        List<Integer> isr = Arrays.asList(1, 2);
-        List<Integer> replicas = Arrays.asList(1, 2, 3, 4);
+        List<Integer> isr = asList(1, 2);
+        List<Integer> replicas = asList(1, 2, 3, 4);
         partitionStates.put(new TopicPartition("topic5", 105),
                 new PartitionState(0, 2, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas)));
         partitionStates.put(new TopicPartition("topic5", 1),
@@ -646,10 +669,10 @@ public class RequestResponseTest {
         partitionStates.put(new TopicPartition("topic20", 1),
                 new PartitionState(1, 0, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas)));
 
-        Set<Node> leaders = new HashSet<>(Arrays.asList(
+        Set<Node> leaders = Utils.mkSet(
                 new Node(0, "test0", 1223),
                 new Node(1, "test1", 1223)
-        ));
+        );
 
         return new LeaderAndIsrRequest.Builder(1, 10, partitionStates, leaders).build();
     }
@@ -662,8 +685,8 @@ public class RequestResponseTest {
 
     private UpdateMetadataRequest createUpdateMetadataRequest(int version, String rack) {
         Map<TopicPartition, PartitionState> partitionStates = new HashMap<>();
-        List<Integer> isr = Arrays.asList(1, 2);
-        List<Integer> replicas = Arrays.asList(1, 2, 3, 4);
+        List<Integer> isr = asList(1, 2);
+        List<Integer> replicas = asList(1, 2, 3, 4);
         partitionStates.put(new TopicPartition("topic5", 105),
                 new PartitionState(0, 2, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas)));
         partitionStates.put(new TopicPartition("topic5", 1),
@@ -687,10 +710,10 @@ public class RequestResponseTest {
                     new ListenerName("CLIENT")));
         }
 
-        Set<UpdateMetadataRequest.Broker> liveBrokers = new HashSet<>(Arrays.asList(
+        Set<UpdateMetadataRequest.Broker> liveBrokers = Utils.mkSet(
                 new UpdateMetadataRequest.Broker(0, endPoints1, rack),
                 new UpdateMetadataRequest.Broker(1, endPoints2, rack)
-        ));
+        );
         return new UpdateMetadataRequest.Builder((short) version, 1, 10, partitionStates,
                 liveBrokers).build();
     }
@@ -712,7 +735,7 @@ public class RequestResponseTest {
     }
 
     private ApiVersionsResponse createApiVersionResponse() {
-        List<ApiVersionsResponse.ApiVersion> apiVersions = Arrays.asList(new ApiVersionsResponse.ApiVersion((short) 0, (short) 0, (short) 2));
+        List<ApiVersionsResponse.ApiVersion> apiVersions = asList(new ApiVersionsResponse.ApiVersion((short) 0, (short) 0, (short) 2));
         return new ApiVersionsResponse(Errors.NONE, apiVersions);
     }
 
@@ -724,8 +747,8 @@ public class RequestResponseTest {
         CreateTopicsRequest.TopicDetails request1 = new CreateTopicsRequest.TopicDetails(3, (short) 5);
 
         Map<Integer, List<Integer>> replicaAssignments = new HashMap<>();
-        replicaAssignments.put(1, Arrays.asList(1, 2, 3));
-        replicaAssignments.put(2, Arrays.asList(2, 3, 4));
+        replicaAssignments.put(1, asList(1, 2, 3));
+        replicaAssignments.put(2, asList(2, 3, 4));
 
         Map<String, String> configs = new HashMap<>();
         configs.put("config1", "value1");
@@ -746,8 +769,7 @@ public class RequestResponseTest {
     }
 
     private DeleteTopicsRequest createDeleteTopicsRequest() {
-        return new DeleteTopicsRequest.Builder(new HashSet<>(Arrays.asList("my_t1", "my_t2")), 10000).
-                build();
+        return new DeleteTopicsRequest.Builder(Utils.mkSet("my_t1", "my_t2"), 10000).build();
     }
 
     private DeleteTopicsResponse createDeleteTopicsResponse() {