You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/03/25 21:08:02 UTC
kafka git commit: MINOR: Fix deserialization of abortedTransactions
and lastStableOffset in FetchResponse
Repository: kafka
Updated Branches:
refs/heads/trunk 462767660 -> d348ac92c
MINOR: Fix deserialization of abortedTransactions and lastStableOffset in FetchResponse
Thanks to Dong Lin for finding the lastStableOffset issue.
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Dong Lin <li...@gmail.com>, Jason Gustafson <ja...@confluent.io>
Closes #2737 from ijuma/fix-fetch-response-lso
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d348ac92
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d348ac92
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d348ac92
Branch: refs/heads/trunk
Commit: d348ac92c80b6426864e3ed68b89bad341b3bda5
Parents: 4627676
Author: Ismael Juma <is...@juma.me.uk>
Authored: Sat Mar 25 14:03:40 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Sat Mar 25 14:03:40 2017 -0700
----------------------------------------------------------------------
.../kafka/common/requests/FetchResponse.java | 56 ++++++++++++--
.../common/requests/RequestResponseTest.java | 78 +++++++++++++-------
2 files changed, 100 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/d348ac92/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index 617ebe0..f0a0eee 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -29,7 +29,6 @@ import org.apache.kafka.common.record.Records;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -86,6 +85,25 @@ public class FetchResponse extends AbstractResponse {
}
@Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ AbortedTransaction that = (AbortedTransaction) o;
+
+ return pid == that.pid && firstOffset == that.firstOffset;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = (int) (pid ^ (pid >>> 32));
+ result = 31 * result + (int) (firstOffset ^ (firstOffset >>> 32));
+ return result;
+ }
+
+ @Override
public String toString() {
return "(pid=" + pid + ", firstOffset=" + firstOffset + ")";
}
@@ -111,6 +129,32 @@ public class FetchResponse extends AbstractResponse {
}
@Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ PartitionData that = (PartitionData) o;
+
+ return error == that.error &&
+ highWatermark == that.highWatermark &&
+ lastStableOffset == that.lastStableOffset &&
+ (abortedTransactions == null ? that.abortedTransactions == null : abortedTransactions.equals(that.abortedTransactions)) &&
+ (records == null ? that.records == null : records.equals(that.records));
+ }
+
+ @Override
+ public int hashCode() {
+ int result = error != null ? error.hashCode() : 0;
+ result = 31 * result + (int) (lastStableOffset ^ (lastStableOffset >>> 32));
+ result = 31 * result + (int) (highWatermark ^ (highWatermark >>> 32));
+ result = 31 * result + (abortedTransactions != null ? abortedTransactions.hashCode() : 0);
+ result = 31 * result + (records != null ? records.hashCode() : 0);
+ return result;
+ }
+
+ @Override
public String toString() {
return "(error=" + error + ", highWaterMark=" + highWatermark +
", lastStableOffset = " + lastStableOffset + ", " +
@@ -144,14 +188,14 @@ public class FetchResponse extends AbstractResponse {
Errors error = Errors.forCode(partitionResponseHeader.getShort(ERROR_CODE_KEY_NAME));
long highWatermark = partitionResponseHeader.getLong(HIGH_WATERMARK_KEY_NAME);
long lastStableOffset = INVALID_LSO;
- if (partitionResponse.hasField(LAST_STABLE_OFFSET_KEY_NAME))
- lastStableOffset = partitionResponse.getLong(LAST_STABLE_OFFSET_KEY_NAME);
+ if (partitionResponseHeader.hasField(LAST_STABLE_OFFSET_KEY_NAME))
+ lastStableOffset = partitionResponseHeader.getLong(LAST_STABLE_OFFSET_KEY_NAME);
Records records = partitionResponse.getRecords(RECORD_SET_KEY_NAME);
- List<AbortedTransaction> abortedTransactions = Collections.emptyList();
- if (partitionResponse.hasField(ABORTED_TRANSACTIONS_KEY_NAME)) {
- Object[] abortedTransactionsArray = partitionResponse.getArray(ABORTED_TRANSACTIONS_KEY_NAME);
+ List<AbortedTransaction> abortedTransactions = null;
+ if (partitionResponseHeader.hasField(ABORTED_TRANSACTIONS_KEY_NAME)) {
+ Object[] abortedTransactionsArray = partitionResponseHeader.getArray(ABORTED_TRANSACTIONS_KEY_NAME);
if (abortedTransactionsArray != null) {
abortedTransactions = new ArrayList<>(abortedTransactionsArray.length);
for (Object abortedTransactionObj : abortedTransactionsArray) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/d348ac92/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index b9fbf06..8a6d69a 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -35,6 +35,7 @@ import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.utils.Utils;
import org.junit.Test;
import java.io.IOException;
@@ -43,7 +44,6 @@ import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.nio.channels.GatheringByteChannel;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -52,6 +52,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -91,10 +92,10 @@ public class RequestResponseTest {
checkErrorResponse(createListOffsetRequest(1), new UnknownServerException());
checkResponse(createListOffsetResponse(1), 1);
checkRequest(MetadataRequest.Builder.allTopics().build((short) 2));
- checkRequest(createMetadataRequest(1, Arrays.asList("topic1")));
- checkErrorResponse(createMetadataRequest(1, Arrays.asList("topic1")), new UnknownServerException());
+ checkRequest(createMetadataRequest(1, asList("topic1")));
+ checkErrorResponse(createMetadataRequest(1, asList("topic1")), new UnknownServerException());
checkResponse(createMetadataResponse(), 2);
- checkErrorResponse(createMetadataRequest(2, Arrays.asList("topic1")), new UnknownServerException());
+ checkErrorResponse(createMetadataRequest(2, asList("topic1")), new UnknownServerException());
checkRequest(createOffsetCommitRequest(2));
checkErrorResponse(createOffsetCommitRequest(2), new UnknownServerException());
checkResponse(createOffsetCommitResponse(), 0);
@@ -138,7 +139,7 @@ public class RequestResponseTest {
checkOlderFetchVersions();
checkResponse(createMetadataResponse(), 0);
checkResponse(createMetadataResponse(), 1);
- checkErrorResponse(createMetadataRequest(1, Arrays.asList("topic1")), new UnknownServerException());
+ checkErrorResponse(createMetadataRequest(1, asList("topic1")), new UnknownServerException());
checkRequest(createOffsetCommitRequest(0));
checkErrorResponse(createOffsetCommitRequest(0), new UnknownServerException());
checkRequest(createOffsetCommitRequest(1));
@@ -362,6 +363,28 @@ public class RequestResponseTest {
}
@Test
+ public void testFetchResponseV4() {
+ LinkedHashMap<TopicPartition, FetchResponse.PartitionData> responseData = new LinkedHashMap<>();
+
+ MemoryRecords records = MemoryRecords.readableRecords(ByteBuffer.allocate(10));
+
+ List<FetchResponse.AbortedTransaction> abortedTransactions = asList(
+ new FetchResponse.AbortedTransaction(10, 100),
+ new FetchResponse.AbortedTransaction(15, 50)
+ );
+ responseData.put(new TopicPartition("bar", 0), new FetchResponse.PartitionData(Errors.NONE, 100000,
+ FetchResponse.INVALID_LSO, abortedTransactions, records));
+ responseData.put(new TopicPartition("bar", 1), new FetchResponse.PartitionData(Errors.NONE, 900000,
+ 5, null, records));
+ responseData.put(new TopicPartition("foo", 0), new FetchResponse.PartitionData(Errors.NONE, 70000,
+ 6, Collections.<FetchResponse.AbortedTransaction>emptyList(), records));
+
+ FetchResponse response = new FetchResponse(responseData, 10);
+ FetchResponse deserialized = FetchResponse.parse(toBuffer(response.toStruct((short) 4)), (short) 4);
+ assertEquals(responseData, deserialized.responseData());
+ }
+
+ @Test
public void verifyFetchResponseFullWrite() throws Exception {
FetchResponse fetchResponse = createFetchResponse();
RequestHeader header = new RequestHeader(ApiKeys.FETCH.id, ApiKeys.FETCH.latestVersion(),
@@ -488,7 +511,7 @@ public class RequestResponseTest {
}
private ListGroupsResponse createListGroupsResponse() {
- List<ListGroupsResponse.Group> groups = Arrays.asList(new ListGroupsResponse.Group("test-group", "consumer"));
+ List<ListGroupsResponse.Group> groups = asList(new ListGroupsResponse.Group("test-group", "consumer"));
return new ListGroupsResponse(Errors.NONE, groups);
}
@@ -503,7 +526,7 @@ public class RequestResponseTest {
DescribeGroupsResponse.GroupMember member = new DescribeGroupsResponse.GroupMember("memberId",
clientId, clientHost, empty, empty);
DescribeGroupsResponse.GroupMetadata metadata = new DescribeGroupsResponse.GroupMetadata(Errors.NONE,
- "STABLE", "consumer", "roundrobin", Arrays.asList(member));
+ "STABLE", "consumer", "roundrobin", asList(member));
return new DescribeGroupsResponse(Collections.singletonMap("test-group", metadata));
}
@@ -536,7 +559,7 @@ public class RequestResponseTest {
if (version == 0) {
Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = new HashMap<>();
responseData.put(new TopicPartition("test", 0),
- new ListOffsetResponse.PartitionData(Errors.NONE, Arrays.asList(100L)));
+ new ListOffsetResponse.PartitionData(Errors.NONE, asList(100L)));
return new ListOffsetResponse(responseData);
} else if (version == 1) {
Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = new HashMap<>();
@@ -554,16 +577,16 @@ public class RequestResponseTest {
private MetadataResponse createMetadataResponse() {
Node node = new Node(1, "host1", 1001);
- List<Node> replicas = Arrays.asList(node);
- List<Node> isr = Arrays.asList(node);
+ List<Node> replicas = asList(node);
+ List<Node> isr = asList(node);
List<MetadataResponse.TopicMetadata> allTopicMetadata = new ArrayList<>();
allTopicMetadata.add(new MetadataResponse.TopicMetadata(Errors.NONE, "__consumer_offsets", true,
- Arrays.asList(new MetadataResponse.PartitionMetadata(Errors.NONE, 1, node, replicas, isr))));
+ asList(new MetadataResponse.PartitionMetadata(Errors.NONE, 1, node, replicas, isr))));
allTopicMetadata.add(new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, "topic2", false,
Collections.<MetadataResponse.PartitionMetadata>emptyList()));
- return new MetadataResponse(Arrays.asList(node), null, MetadataResponse.NO_CONTROLLER_ID, allTopicMetadata);
+ return new MetadataResponse(asList(node), null, MetadataResponse.NO_CONTROLLER_ID, allTopicMetadata);
}
private OffsetCommitRequest createOffsetCommitRequest(int version) {
@@ -613,7 +636,7 @@ public class RequestResponseTest {
}
private StopReplicaRequest createStopReplicaRequest(boolean deletePartitions) {
- Set<TopicPartition> partitions = new HashSet<>(Arrays.asList(new TopicPartition("test", 0)));
+ Set<TopicPartition> partitions = Utils.mkSet(new TopicPartition("test", 0));
return new StopReplicaRequest.Builder(0, 1, deletePartitions, partitions).build();
}
@@ -628,17 +651,17 @@ public class RequestResponseTest {
}
private ControlledShutdownResponse createControlledShutdownResponse() {
- HashSet<TopicPartition> topicPartitions = new HashSet<>(Arrays.asList(
+ Set<TopicPartition> topicPartitions = Utils.mkSet(
new TopicPartition("test2", 5),
new TopicPartition("test1", 10)
- ));
+ );
return new ControlledShutdownResponse(Errors.NONE, topicPartitions);
}
private LeaderAndIsrRequest createLeaderAndIsrRequest() {
Map<TopicPartition, PartitionState> partitionStates = new HashMap<>();
- List<Integer> isr = Arrays.asList(1, 2);
- List<Integer> replicas = Arrays.asList(1, 2, 3, 4);
+ List<Integer> isr = asList(1, 2);
+ List<Integer> replicas = asList(1, 2, 3, 4);
partitionStates.put(new TopicPartition("topic5", 105),
new PartitionState(0, 2, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas)));
partitionStates.put(new TopicPartition("topic5", 1),
@@ -646,10 +669,10 @@ public class RequestResponseTest {
partitionStates.put(new TopicPartition("topic20", 1),
new PartitionState(1, 0, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas)));
- Set<Node> leaders = new HashSet<>(Arrays.asList(
+ Set<Node> leaders = Utils.mkSet(
new Node(0, "test0", 1223),
new Node(1, "test1", 1223)
- ));
+ );
return new LeaderAndIsrRequest.Builder(1, 10, partitionStates, leaders).build();
}
@@ -662,8 +685,8 @@ public class RequestResponseTest {
private UpdateMetadataRequest createUpdateMetadataRequest(int version, String rack) {
Map<TopicPartition, PartitionState> partitionStates = new HashMap<>();
- List<Integer> isr = Arrays.asList(1, 2);
- List<Integer> replicas = Arrays.asList(1, 2, 3, 4);
+ List<Integer> isr = asList(1, 2);
+ List<Integer> replicas = asList(1, 2, 3, 4);
partitionStates.put(new TopicPartition("topic5", 105),
new PartitionState(0, 2, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas)));
partitionStates.put(new TopicPartition("topic5", 1),
@@ -687,10 +710,10 @@ public class RequestResponseTest {
new ListenerName("CLIENT")));
}
- Set<UpdateMetadataRequest.Broker> liveBrokers = new HashSet<>(Arrays.asList(
+ Set<UpdateMetadataRequest.Broker> liveBrokers = Utils.mkSet(
new UpdateMetadataRequest.Broker(0, endPoints1, rack),
new UpdateMetadataRequest.Broker(1, endPoints2, rack)
- ));
+ );
return new UpdateMetadataRequest.Builder((short) version, 1, 10, partitionStates,
liveBrokers).build();
}
@@ -712,7 +735,7 @@ public class RequestResponseTest {
}
private ApiVersionsResponse createApiVersionResponse() {
- List<ApiVersionsResponse.ApiVersion> apiVersions = Arrays.asList(new ApiVersionsResponse.ApiVersion((short) 0, (short) 0, (short) 2));
+ List<ApiVersionsResponse.ApiVersion> apiVersions = asList(new ApiVersionsResponse.ApiVersion((short) 0, (short) 0, (short) 2));
return new ApiVersionsResponse(Errors.NONE, apiVersions);
}
@@ -724,8 +747,8 @@ public class RequestResponseTest {
CreateTopicsRequest.TopicDetails request1 = new CreateTopicsRequest.TopicDetails(3, (short) 5);
Map<Integer, List<Integer>> replicaAssignments = new HashMap<>();
- replicaAssignments.put(1, Arrays.asList(1, 2, 3));
- replicaAssignments.put(2, Arrays.asList(2, 3, 4));
+ replicaAssignments.put(1, asList(1, 2, 3));
+ replicaAssignments.put(2, asList(2, 3, 4));
Map<String, String> configs = new HashMap<>();
configs.put("config1", "value1");
@@ -746,8 +769,7 @@ public class RequestResponseTest {
}
private DeleteTopicsRequest createDeleteTopicsRequest() {
- return new DeleteTopicsRequest.Builder(new HashSet<>(Arrays.asList("my_t1", "my_t2")), 10000).
- build();
+ return new DeleteTopicsRequest.Builder(Utils.mkSet("my_t1", "my_t2"), 10000).build();
}
private DeleteTopicsResponse createDeleteTopicsResponse() {