You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2017/05/11 20:18:45 UTC
kafka git commit: MINOR: JoinGroupRequest V0 invalid rebalance timeout
Repository: kafka
Updated Branches:
refs/heads/trunk 24a4e6146 -> 27107ee34
MINOR: JoinGroupRequest V0 invalid rebalance timeout
A JoinGroupRequest V0 built with the Builder had
a rebalance timeout = -1 rather than equal to session timeout
as it would have been if coming from the wire and deserialized
from a V0 Struct
fix developed with mimaison
Author: Edoardo Comar <ec...@uk.ibm.com>
Reviewers: Rajini Sivaram
Closes #2936 from edoardocomar/MINOR-JoinGroupRequestV0
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/27107ee3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/27107ee3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/27107ee3
Branch: refs/heads/trunk
Commit: 27107ee34d1df89035eb9b9b4e11036fca6cf723
Parents: 24a4e61
Author: Edoardo Comar <ec...@uk.ibm.com>
Authored: Thu May 11 16:17:34 2017 -0400
Committer: Rajini Sivaram <ra...@googlemail.com>
Committed: Thu May 11 16:17:34 2017 -0400
----------------------------------------------------------------------
.../apache/kafka/common/requests/JoinGroupRequest.java | 3 ++-
.../kafka/common/requests/RequestResponseTest.java | 11 +++++++++--
2 files changed, 11 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/27107ee3/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
index 1080fe7..ff07d13 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
@@ -90,7 +90,8 @@ public class JoinGroupRequest extends AbstractRequest {
@Override
public JoinGroupRequest build(short version) {
if (version < 1) {
- rebalanceTimeout = -1;
+ // v0 had no rebalance timeout but used session timeout implicitly
+ rebalanceTimeout = sessionTimeout;
}
return new JoinGroupRequest(version, groupId, sessionTimeout,
rebalanceTimeout, memberId, protocolType, groupProtocols);
http://git-wip-us.apache.org/repos/asf/kafka/blob/27107ee3/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 6443e4d..b1e83bf 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -511,7 +511,15 @@ public class RequestResponseTest {
deserialized = (FetchRequest) deserialize(request, struct, request.version());
assertEquals(request.isolationLevel(), deserialized.isolationLevel());
}
-
+
+ @Test
+ public void testJoinGroupRequestVersion0RebalanceTimeout() throws Exception {
+ final short version = 0;
+ JoinGroupRequest jgr = createJoinGroupRequest(version);
+ JoinGroupRequest jgr2 = new JoinGroupRequest(jgr.toStruct(), version);
+ assertEquals(jgr2.rebalanceTimeout(), jgr.rebalanceTimeout());
+ }
+
private RequestHeader createRequestHeader() {
return new RequestHeader((short) 10, (short) 1, "", 10);
}
@@ -565,7 +573,6 @@ public class RequestResponseTest {
return new HeartbeatResponse(Errors.NONE);
}
- @SuppressWarnings("deprecation")
private JoinGroupRequest createJoinGroupRequest(int version) {
ByteBuffer metadata = ByteBuffer.wrap(new byte[] {});
List<JoinGroupRequest.ProtocolMetadata> protocols = new ArrayList<>();