You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2016/11/15 00:46:29 UTC
[3/5] kafka git commit: KAFKA-2066;
Use client-side FetchRequest/FetchResponse on server
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java b/clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java
deleted file mode 100644
index 02cac80..0000000
--- a/clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.common.requests;
-
-import java.nio.ByteBuffer;
-
-import org.apache.kafka.common.network.NetworkSend;
-import org.apache.kafka.common.protocol.types.Struct;
-
-/**
- * A send object for a kafka request
- */
-public class RequestSend extends NetworkSend {
-
- private final RequestHeader header;
- private final Struct body;
-
- public RequestSend(String destination, RequestHeader header, Struct body) {
- super(destination, serialize(header, body));
- this.header = header;
- this.body = body;
- }
-
- public static ByteBuffer serialize(RequestHeader header, Struct body) {
- ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + body.sizeOf());
- header.writeTo(buffer);
- body.writeTo(buffer);
- buffer.rewind();
- return buffer;
- }
-
- public RequestHeader header() {
- return this.header;
- }
-
- public Struct body() {
- return body;
- }
-
- @Override
- public String toString() {
- return "RequestSend(header=" + header.toString() + ", body=" + body.toString() + ")";
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java b/clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java
deleted file mode 100644
index 9494de7..0000000
--- a/clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.requests;
-
-import org.apache.kafka.common.network.NetworkSend;
-import org.apache.kafka.common.protocol.types.Struct;
-
-import java.nio.ByteBuffer;
-
-public class ResponseSend extends NetworkSend {
-
- public ResponseSend(String destination, ResponseHeader header, Struct body) {
- super(destination, serialize(header, body));
- }
-
- public ResponseSend(String destination, ResponseHeader header, AbstractRequestResponse response) {
- this(destination, header, response.toStruct());
- }
-
- public static ByteBuffer serialize(ResponseHeader header, Struct body) {
- ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + body.sizeOf());
- header.writeTo(buffer);
- body.writeTo(buffer);
- buffer.rewind();
- return buffer;
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java
index bddc9f0..97adcb3 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java
@@ -61,7 +61,7 @@ public class SaslHandshakeRequest extends AbstractRequest {
}
@Override
- public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+ public AbstractResponse getErrorResponse(int versionId, Throwable e) {
switch (versionId) {
case 0:
List<String> enabledMechanisms = Collections.emptyList();
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java
index c0fc495..6d7f734 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java
@@ -33,7 +33,7 @@ import org.apache.kafka.common.protocol.types.Struct;
* Response from SASL server which indicates if the client-chosen mechanism is enabled in the server.
* For error responses, the list of enabled mechanisms is included in the response.
*/
-public class SaslHandshakeResponse extends AbstractRequestResponse {
+public class SaslHandshakeResponse extends AbstractResponse {
private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.SASL_HANDSHAKE.id);
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
index bc63521..e3e2507 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
@@ -84,7 +84,7 @@ public class StopReplicaRequest extends AbstractRequest {
}
@Override
- public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+ public AbstractResponse getErrorResponse(int versionId, Throwable e) {
Map<TopicPartition, Short> responses = new HashMap<>(partitions.size());
for (TopicPartition partition : partitions) {
responses.put(partition, Errors.forException(e).code());
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java
index 3c4db68..92d9e58 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java
@@ -26,7 +26,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-public class StopReplicaResponse extends AbstractRequestResponse {
+public class StopReplicaResponse extends AbstractResponse {
private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.STOP_REPLICA.id);
private static final String ERROR_CODE_KEY_NAME = "error_code";
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java
index 606584b..efb484c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java
@@ -83,7 +83,7 @@ public class SyncGroupRequest extends AbstractRequest {
}
@Override
- public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+ public AbstractResponse getErrorResponse(int versionId, Throwable e) {
switch (versionId) {
case 0:
return new SyncGroupResponse(
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
index 3584f14..f459656 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
@@ -23,7 +23,7 @@ import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
-public class SyncGroupResponse extends AbstractRequestResponse {
+public class SyncGroupResponse extends AbstractResponse {
private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.SYNC_GROUP.id);
public static final String ERROR_CODE_KEY_NAME = "error_code";
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
index 1c21789..2e5ffb2 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
@@ -242,7 +242,7 @@ public class UpdateMetadataRequest extends AbstractRequest {
}
@Override
- public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+ public AbstractResponse getErrorResponse(int versionId, Throwable e) {
if (versionId <= 2)
return new UpdateMetadataResponse(Errors.forException(e).code());
else
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java
index 0ff35d9..e1c3634 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java
@@ -20,7 +20,7 @@ import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
-public class UpdateMetadataResponse extends AbstractRequestResponse {
+public class UpdateMetadataResponse extends AbstractResponse {
private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.UPDATE_METADATA_KEY.id);
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
index ba201dc..b9c3b33 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
@@ -18,45 +18,44 @@
package org.apache.kafka.common.security.authenticator;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.util.Arrays;
-import java.util.Map;
-import java.security.Principal;
-import java.security.PrivilegedActionException;
-import java.security.PrivilegedExceptionAction;
-
-import javax.security.auth.Subject;
-import javax.security.sasl.Sasl;
-import javax.security.sasl.SaslClient;
-import javax.security.sasl.SaslException;
-
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.IllegalSaslStateException;
import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
import org.apache.kafka.common.network.Authenticator;
import org.apache.kafka.common.network.Mode;
-import org.apache.kafka.common.network.NetworkSend;
import org.apache.kafka.common.network.NetworkReceive;
+import org.apache.kafka.common.network.NetworkSend;
+import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.network.TransportLayer;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.SchemaException;
-import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.RequestHeader;
-import org.apache.kafka.common.requests.RequestSend;
import org.apache.kafka.common.requests.SaslHandshakeRequest;
import org.apache.kafka.common.requests.SaslHandshakeResponse;
import org.apache.kafka.common.security.auth.AuthCallbackHandler;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.PrincipalBuilder;
-import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.NetworkClient;
-import org.apache.kafka.common.KafkaException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.security.auth.Subject;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.security.Principal;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
+import java.util.Map;
+
public class SaslClientAuthenticator implements Authenticator {
public enum SaslState {
@@ -81,7 +80,7 @@ public class SaslClientAuthenticator implements Authenticator {
// buffers used in `authenticate`
private NetworkReceive netInBuffer;
- private NetworkSend netOutBuffer;
+ private Send netOutBuffer;
// Current SASL state
private SaslState saslState;
@@ -156,7 +155,7 @@ public class SaslClientAuthenticator implements Authenticator {
String clientId = (String) configs.get(CommonClientConfigs.CLIENT_ID_CONFIG);
currentRequestHeader = new RequestHeader(ApiKeys.SASL_HANDSHAKE.id, clientId, correlationId++);
SaslHandshakeRequest handshakeRequest = new SaslHandshakeRequest(mechanism);
- send(RequestSend.serialize(currentRequestHeader, handshakeRequest.toStruct()));
+ send(handshakeRequest.toSend(node, currentRequestHeader));
setSaslState(SaslState.RECEIVE_HANDSHAKE_RESPONSE);
break;
case RECEIVE_HANDSHAKE_RESPONSE:
@@ -209,13 +208,13 @@ public class SaslClientAuthenticator implements Authenticator {
if (!saslClient.isComplete()) {
byte[] saslToken = createSaslToken(serverToken, isInitial);
if (saslToken != null)
- send(ByteBuffer.wrap(saslToken));
+ send(new NetworkSend(node, ByteBuffer.wrap(saslToken)));
}
}
- private void send(ByteBuffer buffer) throws IOException {
+ private void send(Send send) throws IOException {
try {
- netOutBuffer = new NetworkSend(node, buffer);
+ netOutBuffer = send;
flushNetOutBufferAndUpdateInterestOps();
} catch (IOException e) {
setSaslState(SaslState.FAILED);
@@ -281,7 +280,7 @@ public class SaslClientAuthenticator implements Authenticator {
// TODO: introspect about e: look for GSS information.
final String unknownServerErrorText =
"(Mechanism level: Server not found in Kerberos database (7) - UNKNOWN_SERVER)";
- if (e.toString().indexOf(unknownServerErrorText) > -1) {
+ if (e.toString().contains(unknownServerErrorText)) {
error += " This may be caused by Java's being unable to resolve the Kafka Broker's" +
" hostname correctly. You may want to try to adding" +
" '-Dsun.net.spi.nameservice.provider.1=dns,sun' to your client's JVMFLAGS environment." +
@@ -302,10 +301,10 @@ public class SaslClientAuthenticator implements Authenticator {
}
private void handleKafkaResponse(RequestHeader requestHeader, byte[] responseBytes) {
- Struct struct;
+ AbstractResponse response;
ApiKeys apiKey;
try {
- struct = NetworkClient.parseResponse(ByteBuffer.wrap(responseBytes), requestHeader);
+ response = NetworkClient.parseResponse(ByteBuffer.wrap(responseBytes), requestHeader);
apiKey = ApiKeys.forId(requestHeader.apiKey());
} catch (SchemaException | IllegalArgumentException e) {
LOG.debug("Invalid SASL mechanism response, server may be expecting only GSSAPI tokens");
@@ -313,7 +312,7 @@ public class SaslClientAuthenticator implements Authenticator {
}
switch (apiKey) {
case SASL_HANDSHAKE:
- handleSaslHandshakeResponse(new SaslHandshakeResponse(struct));
+ handleSaslHandshakeResponse((SaslHandshakeResponse) response);
break;
default:
throw new IllegalStateException("Unexpected API key during handshake: " + apiKey);
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index 206fe39..b193bf2 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -17,36 +17,7 @@
package org.apache.kafka.common.security.authenticator;
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.security.Principal;
-import java.security.PrivilegedActionException;
-import java.security.PrivilegedExceptionAction;
-
-import javax.security.auth.login.Configuration;
-import javax.security.auth.Subject;
-import javax.security.sasl.Sasl;
-import javax.security.sasl.SaslServer;
-import javax.security.sasl.SaslException;
-
import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.security.kerberos.KerberosName;
-import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
-import org.ietf.jgss.GSSContext;
-import org.ietf.jgss.GSSCredential;
-import org.ietf.jgss.GSSException;
-import org.ietf.jgss.GSSManager;
-import org.ietf.jgss.GSSName;
-import org.ietf.jgss.Oid;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.kafka.common.security.auth.AuthCallbackHandler;
-import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.IllegalSaslStateException;
@@ -54,22 +25,49 @@ import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.network.Authenticator;
import org.apache.kafka.common.network.Mode;
-import org.apache.kafka.common.network.NetworkSend;
import org.apache.kafka.common.network.NetworkReceive;
+import org.apache.kafka.common.network.NetworkSend;
+import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.network.TransportLayer;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.Protocol;
import org.apache.kafka.common.protocol.types.SchemaException;
import org.apache.kafka.common.requests.AbstractRequest;
-import org.apache.kafka.common.requests.AbstractRequestResponse;
+import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.RequestHeader;
-import org.apache.kafka.common.requests.ResponseHeader;
-import org.apache.kafka.common.requests.ResponseSend;
import org.apache.kafka.common.requests.SaslHandshakeRequest;
import org.apache.kafka.common.requests.SaslHandshakeResponse;
+import org.apache.kafka.common.security.auth.AuthCallbackHandler;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.PrincipalBuilder;
+import org.apache.kafka.common.security.kerberos.KerberosName;
+import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
+import org.ietf.jgss.GSSContext;
+import org.ietf.jgss.GSSCredential;
+import org.ietf.jgss.GSSException;
+import org.ietf.jgss.GSSManager;
+import org.ietf.jgss.GSSName;
+import org.ietf.jgss.Oid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import javax.security.auth.login.Configuration;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.security.Principal;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
public class SaslServerAuthenticator implements Authenticator {
@@ -100,7 +98,7 @@ public class SaslServerAuthenticator implements Authenticator {
// buffers used in `authenticate`
private NetworkReceive netInBuffer;
- private NetworkSend netOutBuffer;
+ private Send netOutBuffer;
public SaslServerAuthenticator(String node, final Subject subject, KerberosShortNamer kerberosNameParser, String host, int maxReceiveSize) throws IOException {
if (subject == null)
@@ -364,9 +362,8 @@ public class SaslServerAuthenticator implements Authenticator {
sendKafkaResponse(requestHeader, ApiVersionsResponse.apiVersionsResponse());
}
- private void sendKafkaResponse(RequestHeader requestHeader, AbstractRequestResponse response) throws IOException {
- ResponseHeader responseHeader = new ResponseHeader(requestHeader.correlationId());
- netOutBuffer = new NetworkSend(node, ResponseSend.serialize(responseHeader, response.toStruct()));
+ private void sendKafkaResponse(RequestHeader requestHeader, AbstractResponse response) throws IOException {
+ netOutBuffer = response.toSend(node, requestHeader);
flushNetOutBufferAndUpdateInterestOps();
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/utils/AbstractIterator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/AbstractIterator.java b/clients/src/main/java/org/apache/kafka/common/utils/AbstractIterator.java
index b798e73..d6771a3 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/AbstractIterator.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/AbstractIterator.java
@@ -25,9 +25,9 @@ import java.util.NoSuchElementException;
*/
public abstract class AbstractIterator<T> implements Iterator<T> {
- private static enum State {
+ private enum State {
READY, NOT_READY, DONE, FAILED
- };
+ }
private State state = State.NOT_READY;
private T next;
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/test/java/org/apache/kafka/clients/MockClient.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 0af4d34..9b445c7 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -16,6 +16,14 @@
*/
package org.apache.kafka.clients;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.utils.Time;
+
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
@@ -26,31 +34,24 @@ import java.util.Map;
import java.util.Queue;
import java.util.Set;
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.requests.RequestHeader;
-import org.apache.kafka.common.utils.Time;
-
/**
* A mock network client for use testing code
*/
public class MockClient implements KafkaClient {
public static final RequestMatcher ALWAYS_TRUE = new RequestMatcher() {
@Override
- public boolean matches(ClientRequest request) {
+ public boolean matches(AbstractRequest body) {
return true;
}
};
private class FutureResponse {
- public final Struct responseBody;
+ public final AbstractResponse responseBody;
public final boolean disconnected;
public final RequestMatcher requestMatcher;
public Node node;
- public FutureResponse(Struct responseBody, boolean disconnected, RequestMatcher requestMatcher, Node node) {
+ public FutureResponse(AbstractResponse responseBody, boolean disconnected, RequestMatcher requestMatcher, Node node) {
this.responseBody = responseBody;
this.disconnected = disconnected;
this.requestMatcher = requestMatcher;
@@ -125,8 +126,9 @@ public class MockClient implements KafkaClient {
Iterator<ClientRequest> iter = requests.iterator();
while (iter.hasNext()) {
ClientRequest request = iter.next();
- if (request.request().destination().equals(node)) {
- responses.add(new ClientResponse(request, now, true, null));
+ if (request.destination().equals(node)) {
+ responses.add(new ClientResponse(request.header(), request.callback(), request.destination(),
+ request.createdTimeMs(), now, true, null));
iter.remove();
}
}
@@ -138,19 +140,19 @@ public class MockClient implements KafkaClient {
Iterator<FutureResponse> iterator = futureResponses.iterator();
while (iterator.hasNext()) {
FutureResponse futureResp = iterator.next();
- if (futureResp.node != null && !request.request().destination().equals(futureResp.node.idString()))
+ if (futureResp.node != null && !request.destination().equals(futureResp.node.idString()))
continue;
- if (!futureResp.requestMatcher.matches(request))
+ if (!futureResp.requestMatcher.matches(request.body()))
throw new IllegalStateException("Next in line response did not match expected request");
- ClientResponse resp = new ClientResponse(request, time.milliseconds(), futureResp.disconnected, futureResp.responseBody);
+ ClientResponse resp = new ClientResponse(request.header(), request.callback(), request.destination(),
+ request.createdTimeMs(), time.milliseconds(), futureResp.disconnected, futureResp.responseBody);
responses.add(resp);
iterator.remove();
return;
}
- request.setSendTimeMs(now);
this.requests.add(request);
}
@@ -168,8 +170,7 @@ public class MockClient implements KafkaClient {
while (!this.responses.isEmpty()) {
ClientResponse response = this.responses.poll();
- if (response.request().hasCallback())
- response.request().callback().onComplete(response);
+ response.onComplete();
}
return copy;
@@ -179,75 +180,77 @@ public class MockClient implements KafkaClient {
return this.requests;
}
- public void respond(Struct body) {
- respond(body, false);
+ public void respond(AbstractResponse response) {
+ respond(response, false);
}
- public void respond(Struct body, boolean disconnected) {
+ public void respond(AbstractResponse response, boolean disconnected) {
ClientRequest request = requests.remove();
- responses.add(new ClientResponse(request, time.milliseconds(), disconnected, body));
+ responses.add(new ClientResponse(request.header(), request.callback(), request.destination(),
+ request.createdTimeMs(), time.milliseconds(), disconnected, response));
}
- public void respondFrom(Struct body, Node node) {
- respondFrom(body, node, false);
+ public void respondFrom(AbstractResponse response, Node node) {
+ respondFrom(response, node, false);
}
- public void respondFrom(Struct body, Node node, boolean disconnected) {
+ public void respondFrom(AbstractResponse response, Node node, boolean disconnected) {
Iterator<ClientRequest> iterator = requests.iterator();
while (iterator.hasNext()) {
ClientRequest request = iterator.next();
- if (request.request().destination().equals(node.idString())) {
+ if (request.destination().equals(node.idString())) {
iterator.remove();
- responses.add(new ClientResponse(request, time.milliseconds(), disconnected, body));
+ responses.add(new ClientResponse(request.header(), request.callback(), request.destination(),
+ request.createdTimeMs(), time.milliseconds(), disconnected, response));
return;
}
}
throw new IllegalArgumentException("No requests available to node " + node);
}
- public void prepareResponse(Struct body) {
- prepareResponse(ALWAYS_TRUE, body, false);
+ public void prepareResponse(AbstractResponse response) {
+ prepareResponse(ALWAYS_TRUE, response, false);
}
- public void prepareResponseFrom(Struct body, Node node) {
- prepareResponseFrom(ALWAYS_TRUE, body, node, false);
+ public void prepareResponseFrom(AbstractResponse response, Node node) {
+ prepareResponseFrom(ALWAYS_TRUE, response, node, false);
}
/**
* Prepare a response for a request matching the provided matcher. If the matcher does not
- * match, {@link #send(ClientRequest, long)} will throw IllegalStateException
+ * match, {@link KafkaClient#send(ClientRequest, long)} will throw IllegalStateException
* @param matcher The matcher to apply
- * @param body The response body
+ * @param response The response body
*/
- public void prepareResponse(RequestMatcher matcher, Struct body) {
- prepareResponse(matcher, body, false);
+ public void prepareResponse(RequestMatcher matcher, AbstractResponse response) {
+ prepareResponse(matcher, response, false);
}
- public void prepareResponseFrom(RequestMatcher matcher, Struct body, Node node) {
- prepareResponseFrom(matcher, body, node, false);
+ public void prepareResponseFrom(RequestMatcher matcher, AbstractResponse response, Node node) {
+ prepareResponseFrom(matcher, response, node, false);
}
- public void prepareResponse(Struct body, boolean disconnected) {
- prepareResponse(ALWAYS_TRUE, body, disconnected);
+ public void prepareResponse(AbstractResponse response, boolean disconnected) {
+ prepareResponse(ALWAYS_TRUE, response, disconnected);
}
- public void prepareResponseFrom(Struct body, Node node, boolean disconnected) {
- prepareResponseFrom(ALWAYS_TRUE, body, node, disconnected);
+ public void prepareResponseFrom(AbstractResponse response, Node node, boolean disconnected) {
+ prepareResponseFrom(ALWAYS_TRUE, response, node, disconnected);
}
/**
* Prepare a response for a request matching the provided matcher. If the matcher does not
- * match, {@link #send(ClientRequest, long)} will throw IllegalStateException
+ * match, {@link KafkaClient#send(ClientRequest, long)} will throw IllegalStateException
* @param matcher The matcher to apply
- * @param body The response body
+ * @param response The response body
* @param disconnected Whether the request was disconnected
*/
- public void prepareResponse(RequestMatcher matcher, Struct body, boolean disconnected) {
- prepareResponseFrom(matcher, body, null, disconnected);
+ public void prepareResponse(RequestMatcher matcher, AbstractResponse response, boolean disconnected) {
+ prepareResponseFrom(matcher, response, null, disconnected);
}
- public void prepareResponseFrom(RequestMatcher matcher, Struct body, Node node, boolean disconnected) {
- futureResponses.add(new FutureResponse(body, disconnected, matcher, node));
+ public void prepareResponseFrom(RequestMatcher matcher, AbstractResponse response, Node node, boolean disconnected) {
+ futureResponses.add(new FutureResponse(response, disconnected, matcher, node));
}
public void reset() {
@@ -307,12 +310,12 @@ public class MockClient implements KafkaClient {
/**
* The RequestMatcher provides a way to match a particular request to a response prepared
- * through {@link #prepareResponse(RequestMatcher, Struct)}. Basically this allows testers
+ * through {@link #prepareResponse(RequestMatcher, AbstractResponse)}. Basically this allows testers
* to inspect the request body for the type of the request or for specific fields that should be set,
* and to fail the test if it doesn't match.
*/
public interface RequestMatcher {
- boolean matches(ClientRequest request);
+ boolean matches(AbstractRequest body);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index d305e8e..0966ee5 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -23,10 +23,10 @@ import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ProtoUtils;
import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.RequestHeader;
-import org.apache.kafka.common.requests.RequestSend;
import org.apache.kafka.common.requests.ResponseHeader;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.test.MockSelector;
@@ -66,10 +66,9 @@ public class NetworkClientTest {
@Test(expected = IllegalStateException.class)
public void testSendToUnreadyNode() {
- RequestSend send = new RequestSend("5",
- client.nextRequestHeader(ApiKeys.METADATA),
- new MetadataRequest(Arrays.asList("test")).toStruct());
- ClientRequest request = new ClientRequest(time.milliseconds(), false, send, null);
+ MetadataRequest metadataRequest = new MetadataRequest(Arrays.asList("test"));
+ RequestHeader header = client.nextRequestHeader(ApiKeys.METADATA);
+ ClientRequest request = new ClientRequest("5", time.milliseconds(), false, header, metadataRequest, null);
client.send(request, time.milliseconds());
client.poll(1, time.milliseconds());
}
@@ -91,10 +90,9 @@ public class NetworkClientTest {
client.poll(1, time.milliseconds());
assertTrue("The client should be ready", client.isReady(node, time.milliseconds()));
- ProduceRequest produceRequest = new ProduceRequest((short) 1, 1000, Collections.<TopicPartition, ByteBuffer>emptyMap());
+ ProduceRequest produceRequest = new ProduceRequest((short) 1, 1000, Collections.<TopicPartition, MemoryRecords>emptyMap());
RequestHeader reqHeader = client.nextRequestHeader(ApiKeys.PRODUCE);
- RequestSend send = new RequestSend(node.idString(), reqHeader, produceRequest.toStruct());
- ClientRequest request = new ClientRequest(time.milliseconds(), true, send, null);
+ ClientRequest request = new ClientRequest(node.idString(), time.milliseconds(), true, reqHeader, produceRequest, null);
client.send(request, time.milliseconds());
assertEquals("There should be 1 in-flight request after send", 1, client.inFlightRequestCount(node.idString()));
@@ -104,11 +102,10 @@ public class NetworkClientTest {
}
private void checkSimpleRequestResponse(NetworkClient networkClient) {
- ProduceRequest produceRequest = new ProduceRequest((short) 1, 1000, Collections.<TopicPartition, ByteBuffer>emptyMap());
+ ProduceRequest produceRequest = new ProduceRequest((short) 1, 1000, Collections.<TopicPartition, MemoryRecords>emptyMap());
RequestHeader reqHeader = networkClient.nextRequestHeader(ApiKeys.PRODUCE);
- RequestSend send = new RequestSend(node.idString(), reqHeader, produceRequest.toStruct());
TestCallbackHandler handler = new TestCallbackHandler();
- ClientRequest request = new ClientRequest(time.milliseconds(), true, send, handler);
+ ClientRequest request = new ClientRequest(node.idString(), time.milliseconds(), true, reqHeader, produceRequest, handler);
awaitReady(networkClient, node);
networkClient.send(request, time.milliseconds());
networkClient.poll(1, time.milliseconds());
@@ -126,7 +123,7 @@ public class NetworkClientTest {
assertEquals(1, responses.size());
assertTrue("The handler should have executed.", handler.executed);
assertTrue("Should have a response body.", handler.response.hasResponse());
- assertEquals("Should be correlated to the original request", request, handler.response.request());
+ assertEquals("Should be correlated to the original request", request.header(), handler.response.requestHeader());
}
private void awaitReady(NetworkClient client, Node node) {
@@ -136,11 +133,10 @@ public class NetworkClientTest {
@Test
public void testRequestTimeout() {
- ProduceRequest produceRequest = new ProduceRequest((short) 1, 1000, Collections.<TopicPartition, ByteBuffer>emptyMap());
+ ProduceRequest produceRequest = new ProduceRequest((short) 1, 1000, Collections.<TopicPartition, MemoryRecords>emptyMap());
RequestHeader reqHeader = client.nextRequestHeader(ApiKeys.PRODUCE);
- RequestSend send = new RequestSend(node.idString(), reqHeader, produceRequest.toStruct());
TestCallbackHandler handler = new TestCallbackHandler();
- ClientRequest request = new ClientRequest(time.milliseconds(), true, send, handler);
+ ClientRequest request = new ClientRequest(node.idString(), time.milliseconds(), true, reqHeader, produceRequest, handler);
awaitReady(client, node);
long now = time.milliseconds();
client.send(request, now);
@@ -211,9 +207,8 @@ public class NetworkClientTest {
awaitReady(client, node);
RequestHeader reqHeader = client.nextRequestHeader(ApiKeys.METADATA);
- Struct req = new MetadataRequest(Collections.<String>emptyList()).toStruct();
- RequestSend send = new RequestSend(node.idString(), reqHeader, req);
- ClientRequest request = new ClientRequest(time.milliseconds(), true, send, null);
+ MetadataRequest metadataRequest = new MetadataRequest(Collections.<String>emptyList());
+ ClientRequest request = new ClientRequest(node.idString(), time.milliseconds(), true, reqHeader, metadataRequest, null);
client.send(request, time.milliseconds());
client.poll(requestTimeoutMs, time.milliseconds());
assertEquals(1, client.inFlightRequestCount(node.idString()));
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index bf45ee6..fa30a4b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -37,9 +37,9 @@ import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.FetchResponse.PartitionData;
import org.apache.kafka.common.requests.GroupCoordinatorResponse;
@@ -470,10 +470,10 @@ public class KafkaConsumerTest {
final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs);
- consumer.assign(Arrays.asList(tp0));
+ consumer.assign(singletonList(tp0));
// lookup coordinator
- client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE.code(), node).toStruct(), node);
+ client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE.code(), node), node);
Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
// fetch offset for one topic
@@ -892,7 +892,7 @@ public class KafkaConsumerTest {
// the auto commit is disabled, so no offset commit request should be sent
for (ClientRequest req: client.requests())
- assertTrue(req.request().header().apiKey() != ApiKeys.OFFSET_COMMIT.id);
+ assertTrue(req.header().apiKey() != ApiKeys.OFFSET_COMMIT.id);
// subscription change
consumer.unsubscribe();
@@ -903,7 +903,7 @@ public class KafkaConsumerTest {
// the auto commit is disabled, so no offset commit request should be sent
for (ClientRequest req: client.requests())
- assertTrue(req.request().header().apiKey() != ApiKeys.OFFSET_COMMIT.id);
+ assertTrue(req.header().apiKey() != ApiKeys.OFFSET_COMMIT.id);
consumer.close();
}
@@ -933,7 +933,7 @@ public class KafkaConsumerTest {
rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs);
// lookup coordinator
- client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE.code(), node).toStruct(), node);
+ client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE.code(), node), node);
Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
// manual assignment
@@ -997,7 +997,7 @@ public class KafkaConsumerTest {
rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, false, autoCommitIntervalMs);
// lookup coordinator
- client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE.code(), node).toStruct(), node);
+ client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE.code(), node), node);
Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
// manual assignment
@@ -1029,8 +1029,8 @@ public class KafkaConsumerTest {
assertTrue(consumer.assignment().equals(Collections.singleton(t2p0)));
// the auto commit is disabled, so no offset commit request should be sent
- for (ClientRequest req: client.requests())
- assertTrue(req.request().header().apiKey() != ApiKeys.OFFSET_COMMIT.id);
+ for (ClientRequest req : client.requests())
+ assertTrue(req.header().apiKey() != ApiKeys.OFFSET_COMMIT.id);
consumer.close();
}
@@ -1086,15 +1086,15 @@ public class KafkaConsumerTest {
private Node prepareRebalance(MockClient client, Node node, final Set<String> subscribedTopics, PartitionAssignor assignor, List<TopicPartition> partitions, Node coordinator) {
if (coordinator == null) {
// lookup coordinator
- client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE.code(), node).toStruct(), node);
+ client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE.code(), node), node);
coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
}
// join group
client.prepareResponseFrom(new MockClient.RequestMatcher() {
@Override
- public boolean matches(ClientRequest request) {
- JoinGroupRequest joinGroupRequest = new JoinGroupRequest(request.request().body());
+ public boolean matches(AbstractRequest body) {
+ JoinGroupRequest joinGroupRequest = (JoinGroupRequest) body;
PartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(joinGroupRequest.groupProtocols().get(0).metadata());
return subscribedTopics.equals(new HashSet<>(subscription.topics()));
}
@@ -1109,7 +1109,7 @@ public class KafkaConsumerTest {
private Node prepareRebalance(MockClient client, Node node, PartitionAssignor assignor, List<TopicPartition> partitions, Node coordinator) {
if (coordinator == null) {
// lookup coordinator
- client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE.code(), node).toStruct(), node);
+ client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE.code(), node), node);
coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
}
@@ -1126,11 +1126,11 @@ public class KafkaConsumerTest {
final AtomicBoolean heartbeatReceived = new AtomicBoolean(false);
client.prepareResponseFrom(new MockClient.RequestMatcher() {
@Override
- public boolean matches(ClientRequest request) {
+ public boolean matches(AbstractRequest body) {
heartbeatReceived.set(true);
return true;
}
- }, new HeartbeatResponse(Errors.NONE.code()).toStruct(), coordinator);
+ }, new HeartbeatResponse(Errors.NONE.code()), coordinator);
return heartbeatReceived;
}
@@ -1142,8 +1142,8 @@ public class KafkaConsumerTest {
client.prepareResponseFrom(new MockClient.RequestMatcher() {
@Override
- public boolean matches(ClientRequest request) {
- OffsetCommitRequest commitRequest = new OffsetCommitRequest(request.request().body());
+ public boolean matches(AbstractRequest body) {
+ OffsetCommitRequest commitRequest = (OffsetCommitRequest) body;
for (Map.Entry<TopicPartition, Long> partitionOffset : partitionOffsets.entrySet()) {
OffsetCommitRequest.PartitionData partitionData = commitRequest.offsetData().get(partitionOffset.getKey());
// verify that the expected offset has been committed
@@ -1162,39 +1162,38 @@ public class KafkaConsumerTest {
return prepareOffsetCommitResponse(client, coordinator, Collections.singletonMap(partition, offset));
}
- private Struct offsetCommitResponse(Map<TopicPartition, Short> responseData) {
- OffsetCommitResponse response = new OffsetCommitResponse(responseData);
- return response.toStruct();
+ private OffsetCommitResponse offsetCommitResponse(Map<TopicPartition, Short> responseData) {
+ return new OffsetCommitResponse(responseData);
}
- private Struct joinGroupFollowerResponse(PartitionAssignor assignor, int generationId, String memberId, String leaderId, short error) {
+ private JoinGroupResponse joinGroupFollowerResponse(PartitionAssignor assignor, int generationId, String memberId, String leaderId, short error) {
return new JoinGroupResponse(error, generationId, assignor.name(), memberId, leaderId,
- Collections.<String, ByteBuffer>emptyMap()).toStruct();
+ Collections.<String, ByteBuffer>emptyMap());
}
- private Struct syncGroupResponse(List<TopicPartition> partitions, short error) {
+ private SyncGroupResponse syncGroupResponse(List<TopicPartition> partitions, short error) {
ByteBuffer buf = ConsumerProtocol.serializeAssignment(new PartitionAssignor.Assignment(partitions));
- return new SyncGroupResponse(error, buf).toStruct();
+ return new SyncGroupResponse(error, buf);
}
- private Struct offsetResponse(Map<TopicPartition, Long> offsets, short error) {
+ private OffsetFetchResponse offsetResponse(Map<TopicPartition, Long> offsets, short error) {
Map<TopicPartition, OffsetFetchResponse.PartitionData> partitionData = new HashMap<>();
for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet()) {
partitionData.put(entry.getKey(), new OffsetFetchResponse.PartitionData(entry.getValue(), "", error));
}
- return new OffsetFetchResponse(partitionData).toStruct();
+ return new OffsetFetchResponse(partitionData);
}
- private Struct listOffsetsResponse(Map<TopicPartition, Long> offsets, short error) {
+ private ListOffsetResponse listOffsetsResponse(Map<TopicPartition, Long> offsets, short error) {
Map<TopicPartition, ListOffsetResponse.PartitionData> partitionData = new HashMap<>();
for (Map.Entry<TopicPartition, Long> partitionOffset : offsets.entrySet()) {
partitionData.put(partitionOffset.getKey(), new ListOffsetResponse.PartitionData(error,
1L, partitionOffset.getValue()));
}
- return new ListOffsetResponse(partitionData, 1).toStruct();
+ return new ListOffsetResponse(partitionData, 1);
}
- private Struct fetchResponse(Map<TopicPartition, FetchInfo> fetches) {
+ private FetchResponse fetchResponse(Map<TopicPartition, FetchInfo> fetches) {
Map<TopicPartition, PartitionData> tpResponses = new HashMap<>();
for (Map.Entry<TopicPartition, FetchInfo> fetchEntry : fetches.entrySet()) {
TopicPartition partition = fetchEntry.getKey();
@@ -1204,13 +1203,12 @@ public class KafkaConsumerTest {
for (int i = 0; i < fetchCount; i++)
records.append(fetchOffset + i, 0L, ("key-" + i).getBytes(), ("value-" + i).getBytes());
records.close();
- tpResponses.put(partition, new FetchResponse.PartitionData(Errors.NONE.code(), 0, records.buffer()));
+ tpResponses.put(partition, new FetchResponse.PartitionData(Errors.NONE.code(), 0, records));
}
- FetchResponse response = new FetchResponse(tpResponses, 0);
- return response.toStruct();
+ return new FetchResponse(tpResponses, 0);
}
- private Struct fetchResponse(TopicPartition partition, long fetchOffset, int count) {
+ private FetchResponse fetchResponse(TopicPartition partition, long fetchOffset, int count) {
FetchInfo fetchInfo = new FetchInfo(fetchOffset, count);
return fetchResponse(Collections.singletonMap(partition, fetchInfo));
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index 3c8c793..bb617ae 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -16,20 +16,20 @@
**/
package org.apache.kafka.clients.consumer.internals;
-import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.GroupCoordinatorResponse;
+import org.apache.kafka.common.requests.HeartbeatRequest;
import org.apache.kafka.common.requests.HeartbeatResponse;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.JoinGroupResponse;
+import org.apache.kafka.common.requests.SyncGroupRequest;
import org.apache.kafka.common.requests.SyncGroupResponse;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
@@ -114,8 +114,8 @@ public class AbstractCoordinatorTest {
// raise the error when the background thread tries to send a heartbeat
mockClient.prepareResponse(new MockClient.RequestMatcher() {
@Override
- public boolean matches(ClientRequest request) {
- if (request.request().header().apiKey() == ApiKeys.HEARTBEAT.id)
+ public boolean matches(AbstractRequest body) {
+ if (body instanceof HeartbeatRequest)
throw e;
return false;
}
@@ -160,9 +160,9 @@ public class AbstractCoordinatorTest {
mockClient.prepareResponse(new MockClient.RequestMatcher() {
private int invocations = 0;
@Override
- public boolean matches(ClientRequest request) {
+ public boolean matches(AbstractRequest body) {
invocations++;
- boolean isJoinGroupRequest = request.request().header().apiKey() == ApiKeys.JOIN_GROUP.id;
+ boolean isJoinGroupRequest = body instanceof JoinGroupRequest;
if (isJoinGroupRequest && invocations == 1)
// simulate wakeup before the request returns
throw new WakeupException();
@@ -196,9 +196,9 @@ public class AbstractCoordinatorTest {
mockClient.prepareResponse(new MockClient.RequestMatcher() {
private int invocations = 0;
@Override
- public boolean matches(ClientRequest request) {
+ public boolean matches(AbstractRequest body) {
invocations++;
- boolean isJoinGroupRequest = request.request().header().apiKey() == ApiKeys.JOIN_GROUP.id;
+ boolean isJoinGroupRequest = body instanceof JoinGroupRequest;
if (isJoinGroupRequest && invocations == 1)
// simulate wakeup before the request returns
throw new WakeupException();
@@ -233,8 +233,8 @@ public class AbstractCoordinatorTest {
mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
mockClient.prepareResponse(new MockClient.RequestMatcher() {
@Override
- public boolean matches(ClientRequest request) {
- boolean isJoinGroupRequest = request.request().header().apiKey() == ApiKeys.JOIN_GROUP.id;
+ public boolean matches(AbstractRequest body) {
+ boolean isJoinGroupRequest = body instanceof JoinGroupRequest;
if (isJoinGroupRequest)
// wakeup after the request returns
consumerClient.wakeup();
@@ -267,8 +267,8 @@ public class AbstractCoordinatorTest {
mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
mockClient.prepareResponse(new MockClient.RequestMatcher() {
@Override
- public boolean matches(ClientRequest request) {
- boolean isJoinGroupRequest = request.request().header().apiKey() == ApiKeys.JOIN_GROUP.id;
+ public boolean matches(AbstractRequest body) {
+ boolean isJoinGroupRequest = body instanceof JoinGroupRequest;
if (isJoinGroupRequest)
// wakeup after the request returns
consumerClient.wakeup();
@@ -305,9 +305,9 @@ public class AbstractCoordinatorTest {
mockClient.prepareResponse(new MockClient.RequestMatcher() {
private int invocations = 0;
@Override
- public boolean matches(ClientRequest request) {
+ public boolean matches(AbstractRequest body) {
invocations++;
- boolean isSyncGroupRequest = request.request().header().apiKey() == ApiKeys.SYNC_GROUP.id;
+ boolean isSyncGroupRequest = body instanceof SyncGroupRequest;
if (isSyncGroupRequest && invocations == 1)
// simulate wakeup after the request sent
throw new WakeupException();
@@ -341,9 +341,9 @@ public class AbstractCoordinatorTest {
mockClient.prepareResponse(new MockClient.RequestMatcher() {
private int invocations = 0;
@Override
- public boolean matches(ClientRequest request) {
+ public boolean matches(AbstractRequest body) {
invocations++;
- boolean isSyncGroupRequest = request.request().header().apiKey() == ApiKeys.SYNC_GROUP.id;
+ boolean isSyncGroupRequest = body instanceof SyncGroupRequest;
if (isSyncGroupRequest && invocations == 1)
// simulate wakeup after the request sent
throw new WakeupException();
@@ -378,8 +378,8 @@ public class AbstractCoordinatorTest {
mockClient.prepareResponse(joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE));
mockClient.prepareResponse(new MockClient.RequestMatcher() {
@Override
- public boolean matches(ClientRequest request) {
- boolean isSyncGroupRequest = request.request().header().apiKey() == ApiKeys.SYNC_GROUP.id;
+ public boolean matches(AbstractRequest body) {
+ boolean isSyncGroupRequest = body instanceof SyncGroupRequest;
if (isSyncGroupRequest)
// wakeup after the request returns
consumerClient.wakeup();
@@ -412,8 +412,8 @@ public class AbstractCoordinatorTest {
mockClient.prepareResponse(joinGroupFollowerResponse(1, "memberId", "leaderId", Errors.NONE));
mockClient.prepareResponse(new MockClient.RequestMatcher() {
@Override
- public boolean matches(ClientRequest request) {
- boolean isSyncGroupRequest = request.request().header().apiKey() == ApiKeys.SYNC_GROUP.id;
+ public boolean matches(AbstractRequest body) {
+ boolean isSyncGroupRequest = body instanceof SyncGroupRequest;
if (isSyncGroupRequest)
// wakeup after the request returns
consumerClient.wakeup();
@@ -446,8 +446,8 @@ public class AbstractCoordinatorTest {
final AtomicBoolean heartbeatReceived = new AtomicBoolean(false);
mockClient.prepareResponse(new MockClient.RequestMatcher() {
@Override
- public boolean matches(ClientRequest request) {
- boolean isHeartbeatRequest = request.request().header().apiKey() == ApiKeys.HEARTBEAT.id;
+ public boolean matches(AbstractRequest body) {
+ boolean isHeartbeatRequest = body instanceof HeartbeatRequest;
if (isHeartbeatRequest)
heartbeatReceived.set(true);
return isHeartbeatRequest;
@@ -466,23 +466,21 @@ public class AbstractCoordinatorTest {
}, 3000, "Should have received a heartbeat request after joining the group");
}
- private Struct groupCoordinatorResponse(Node node, Errors error) {
- GroupCoordinatorResponse response = new GroupCoordinatorResponse(error.code(), node);
- return response.toStruct();
+ private GroupCoordinatorResponse groupCoordinatorResponse(Node node, Errors error) {
+ return new GroupCoordinatorResponse(error.code(), node);
}
- private Struct heartbeatResponse(Errors error) {
- HeartbeatResponse response = new HeartbeatResponse(error.code());
- return response.toStruct();
+ private HeartbeatResponse heartbeatResponse(Errors error) {
+ return new HeartbeatResponse(error.code());
}
- private Struct joinGroupFollowerResponse(int generationId, String memberId, String leaderId, Errors error) {
+ private JoinGroupResponse joinGroupFollowerResponse(int generationId, String memberId, String leaderId, Errors error) {
return new JoinGroupResponse(error.code(), generationId, "dummy-subprotocol", memberId, leaderId,
- Collections.<String, ByteBuffer>emptyMap()).toStruct();
+ Collections.<String, ByteBuffer>emptyMap());
}
- private Struct syncGroupResponse(Errors error) {
- return new SyncGroupResponse(error.code(), ByteBuffer.allocate(0)).toStruct();
+ private SyncGroupResponse syncGroupResponse(Errors error) {
+ return new SyncGroupResponse(error.code(), ByteBuffer.allocate(0));
}
public class DummyCoordinator extends AbstractCoordinator {
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index a83d7a9..e7ba401 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.clients.consumer.internals;
-import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.consumer.CommitFailedException;
@@ -39,7 +38,7 @@ import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.GroupCoordinatorResponse;
import org.apache.kafka.common.requests.HeartbeatResponse;
import org.apache.kafka.common.requests.JoinGroupRequest;
@@ -311,8 +310,8 @@ public class ConsumerCoordinatorTest {
client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE.code()));
client.prepareResponse(new MockClient.RequestMatcher() {
@Override
- public boolean matches(ClientRequest request) {
- SyncGroupRequest sync = new SyncGroupRequest(request.request().body());
+ public boolean matches(AbstractRequest body) {
+ SyncGroupRequest sync = (SyncGroupRequest) body;
return sync.memberId().equals(consumerId) &&
sync.generationId() == 1 &&
sync.groupAssignment().containsKey(consumerId);
@@ -353,7 +352,7 @@ public class ConsumerCoordinatorTest {
client.prepareResponse(joinGroupLeaderResponse(1, consumerId, initialSubscription, Errors.NONE.code()));
client.prepareResponse(new MockClient.RequestMatcher() {
@Override
- public boolean matches(ClientRequest request) {
+ public boolean matches(AbstractRequest body) {
final Map<String, Integer> updatedPartitions = new HashMap<>();
for (String topic : updatedSubscription)
updatedPartitions.put(topic, 1);
@@ -371,8 +370,8 @@ public class ConsumerCoordinatorTest {
// we expect to see a second rebalance with the new-found topics
client.prepareResponse(new MockClient.RequestMatcher() {
@Override
- public boolean matches(ClientRequest request) {
- JoinGroupRequest join = new JoinGroupRequest(request.request().body());
+ public boolean matches(AbstractRequest body) {
+ JoinGroupRequest join = (JoinGroupRequest) body;
ProtocolMetadata protocolMetadata = join.groupProtocols().iterator().next();
PartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(protocolMetadata.metadata());
protocolMetadata.metadata().rewind();
@@ -443,8 +442,8 @@ public class ConsumerCoordinatorTest {
client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
client.prepareResponse(new MockClient.RequestMatcher() {
@Override
- public boolean matches(ClientRequest request) {
- SyncGroupRequest sync = new SyncGroupRequest(request.request().body());
+ public boolean matches(AbstractRequest body) {
+ SyncGroupRequest sync = (SyncGroupRequest) body;
return sync.memberId().equals(consumerId) &&
sync.generationId() == 1 &&
sync.groupAssignment().isEmpty();
@@ -476,13 +475,13 @@ public class ConsumerCoordinatorTest {
final AtomicBoolean received = new AtomicBoolean(false);
client.prepareResponse(new MockClient.RequestMatcher() {
@Override
- public boolean matches(ClientRequest request) {
+ public boolean matches(AbstractRequest body) {
received.set(true);
- LeaveGroupRequest leaveRequest = new LeaveGroupRequest(request.request().body());
+ LeaveGroupRequest leaveRequest = (LeaveGroupRequest) body;
return leaveRequest.memberId().equals(consumerId) &&
leaveRequest.groupId().equals(groupId);
}
- }, new LeaveGroupResponse(Errors.NONE.code()).toStruct());
+ }, new LeaveGroupResponse(Errors.NONE.code()));
coordinator.close();
assertTrue(received.get());
}
@@ -503,13 +502,13 @@ public class ConsumerCoordinatorTest {
final AtomicBoolean received = new AtomicBoolean(false);
client.prepareResponse(new MockClient.RequestMatcher() {
@Override
- public boolean matches(ClientRequest request) {
+ public boolean matches(AbstractRequest body) {
received.set(true);
- LeaveGroupRequest leaveRequest = new LeaveGroupRequest(request.request().body());
+ LeaveGroupRequest leaveRequest = (LeaveGroupRequest) body;
return leaveRequest.memberId().equals(consumerId) &&
leaveRequest.groupId().equals(groupId);
}
- }, new LeaveGroupResponse(Errors.NONE.code()).toStruct());
+ }, new LeaveGroupResponse(Errors.NONE.code()));
coordinator.maybeLeaveGroup();
assertTrue(received.get());
@@ -548,8 +547,8 @@ public class ConsumerCoordinatorTest {
// now we should see a new join with the empty UNKNOWN_MEMBER_ID
client.prepareResponse(new MockClient.RequestMatcher() {
@Override
- public boolean matches(ClientRequest request) {
- JoinGroupRequest joinRequest = new JoinGroupRequest(request.request().body());
+ public boolean matches(AbstractRequest body) {
+ JoinGroupRequest joinRequest = (JoinGroupRequest) body;
return joinRequest.memberId().equals(JoinGroupRequest.UNKNOWN_MEMBER_ID);
}
}, joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE.code()));
@@ -600,8 +599,8 @@ public class ConsumerCoordinatorTest {
// then let the full join/sync finish successfully
client.prepareResponse(new MockClient.RequestMatcher() {
@Override
- public boolean matches(ClientRequest request) {
- JoinGroupRequest joinRequest = new JoinGroupRequest(request.request().body());
+ public boolean matches(AbstractRequest body) {
+ JoinGroupRequest joinRequest = (JoinGroupRequest) body;
return joinRequest.memberId().equals(JoinGroupRequest.UNKNOWN_MEMBER_ID);
}
}, joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE.code()));
@@ -670,8 +669,8 @@ public class ConsumerCoordinatorTest {
client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE.code()));
client.prepareResponse(new MockClient.RequestMatcher() {
@Override
- public boolean matches(ClientRequest request) {
- SyncGroupRequest sync = new SyncGroupRequest(request.request().body());
+ public boolean matches(AbstractRequest body) {
+ SyncGroupRequest sync = (SyncGroupRequest) body;
if (sync.memberId().equals(consumerId) &&
sync.generationId() == 1 &&
sync.groupAssignment().containsKey(consumerId)) {
@@ -939,7 +938,7 @@ public class ConsumerCoordinatorTest {
coordinator.joinGroupIfNeeded();
// now switch to manual assignment
- client.prepareResponse(new LeaveGroupResponse(Errors.NONE.code()).toStruct());
+ client.prepareResponse(new LeaveGroupResponse(Errors.NONE.code()));
subscriptions.unsubscribe();
coordinator.maybeLeaveGroup();
subscriptions.assignFromUser(singleton(tp));
@@ -947,8 +946,8 @@ public class ConsumerCoordinatorTest {
// the client should not reuse generation/memberId from auto-subscribed generation
client.prepareResponse(new MockClient.RequestMatcher() {
@Override
- public boolean matches(ClientRequest request) {
- OffsetCommitRequest commitRequest = new OffsetCommitRequest(request.request().body());
+ public boolean matches(AbstractRequest body) {
+ OffsetCommitRequest commitRequest = (OffsetCommitRequest) body;
return commitRequest.memberId().equals(OffsetCommitRequest.DEFAULT_MEMBER_ID) &&
commitRequest.generationId() == OffsetCommitRequest.DEFAULT_GENERATION_ID;
}
@@ -1245,48 +1244,44 @@ public class ConsumerCoordinatorTest {
excludeInternalTopics);
}
- private Struct groupCoordinatorResponse(Node node, short error) {
- GroupCoordinatorResponse response = new GroupCoordinatorResponse(error, node);
- return response.toStruct();
+ private GroupCoordinatorResponse groupCoordinatorResponse(Node node, short error) {
+ return new GroupCoordinatorResponse(error, node);
}
- private Struct heartbeatResponse(short error) {
- HeartbeatResponse response = new HeartbeatResponse(error);
- return response.toStruct();
+ private HeartbeatResponse heartbeatResponse(short error) {
+ return new HeartbeatResponse(error);
}
- private Struct joinGroupLeaderResponse(int generationId,
- String memberId,
- Map<String, List<String>> subscriptions,
- short error) {
+ private JoinGroupResponse joinGroupLeaderResponse(int generationId,
+ String memberId,
+ Map<String, List<String>> subscriptions,
+ short error) {
Map<String, ByteBuffer> metadata = new HashMap<>();
for (Map.Entry<String, List<String>> subscriptionEntry : subscriptions.entrySet()) {
PartitionAssignor.Subscription subscription = new PartitionAssignor.Subscription(subscriptionEntry.getValue());
ByteBuffer buf = ConsumerProtocol.serializeSubscription(subscription);
metadata.put(subscriptionEntry.getKey(), buf);
}
- return new JoinGroupResponse(error, generationId, partitionAssignor.name(), memberId, memberId, metadata).toStruct();
+ return new JoinGroupResponse(error, generationId, partitionAssignor.name(), memberId, memberId, metadata);
}
- private Struct joinGroupFollowerResponse(int generationId, String memberId, String leaderId, short error) {
+ private JoinGroupResponse joinGroupFollowerResponse(int generationId, String memberId, String leaderId, short error) {
return new JoinGroupResponse(error, generationId, partitionAssignor.name(), memberId, leaderId,
- Collections.<String, ByteBuffer>emptyMap()).toStruct();
+ Collections.<String, ByteBuffer>emptyMap());
}
- private Struct syncGroupResponse(List<TopicPartition> partitions, short error) {
+ private SyncGroupResponse syncGroupResponse(List<TopicPartition> partitions, short error) {
ByteBuffer buf = ConsumerProtocol.serializeAssignment(new PartitionAssignor.Assignment(partitions));
- return new SyncGroupResponse(error, buf).toStruct();
+ return new SyncGroupResponse(error, buf);
}
- private Struct offsetCommitResponse(Map<TopicPartition, Short> responseData) {
- OffsetCommitResponse response = new OffsetCommitResponse(responseData);
- return response.toStruct();
+ private OffsetCommitResponse offsetCommitResponse(Map<TopicPartition, Short> responseData) {
+ return new OffsetCommitResponse(responseData);
}
- private Struct offsetFetchResponse(TopicPartition tp, Short error, String metadata, long offset) {
+ private OffsetFetchResponse offsetFetchResponse(TopicPartition tp, Short error, String metadata, long offset) {
OffsetFetchResponse.PartitionData data = new OffsetFetchResponse.PartitionData(offset, metadata, error);
- OffsetFetchResponse response = new OffsetFetchResponse(Collections.singletonMap(tp, data));
- return response.toStruct();
+ return new OffsetFetchResponse(Collections.singletonMap(tp, data));
}
private OffsetCommitCallback callback(final AtomicBoolean success) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
index f8ad3ca..003d92d 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.requests.HeartbeatRequest;
import org.apache.kafka.common.requests.HeartbeatResponse;
import org.apache.kafka.common.utils.MockTime;
@@ -60,7 +59,7 @@ public class ConsumerNetworkClientTest {
assertTrue(future.succeeded());
ClientResponse clientResponse = future.value();
- HeartbeatResponse response = new HeartbeatResponse(clientResponse.responseBody());
+ HeartbeatResponse response = (HeartbeatResponse) clientResponse.responseBody();
assertEquals(Errors.NONE.code(), response.errorCode());
}
@@ -206,7 +205,7 @@ public class ConsumerNetworkClientTest {
client.prepareResponse(heartbeatResponse(Errors.NONE.code()));
consumerClient.poll(future2);
ClientResponse clientResponse = future2.value();
- HeartbeatResponse response = new HeartbeatResponse(clientResponse.responseBody());
+ HeartbeatResponse response = (HeartbeatResponse) clientResponse.responseBody();
assertEquals(Errors.NONE.code(), response.errorCode());
// Disable ready flag to delay send and queue another send. Disconnection should remove pending send
@@ -226,9 +225,8 @@ public class ConsumerNetworkClientTest {
return new HeartbeatRequest("group", 1, "memberId");
}
- private Struct heartbeatResponse(short error) {
- HeartbeatResponse response = new HeartbeatResponse(error);
- return response.toStruct();
+ private HeartbeatResponse heartbeatResponse(short error) {
+ return new HeartbeatResponse(error);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 5822646..140e041 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -16,12 +16,12 @@
*/
package org.apache.kafka.clients.consumer.internals;
-import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.Cluster;
@@ -37,12 +37,11 @@ import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.Compressor;
import org.apache.kafka.common.record.MemoryRecords;
-import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.ListOffsetRequest;
@@ -166,8 +165,8 @@ public class FetcherTest {
private MockClient.RequestMatcher matchesOffset(final TopicPartition tp, final long offset) {
return new MockClient.RequestMatcher() {
@Override
- public boolean matches(ClientRequest request) {
- FetchRequest fetch = new FetchRequest(request.request().body());
+ public boolean matches(AbstractRequest body) {
+ FetchRequest fetch = (FetchRequest) body;
return fetch.fetchData().containsKey(tp) &&
fetch.fetchData().get(tp).offset == offset;
}
@@ -542,7 +541,7 @@ public class FetcherTest {
@Test
public void testGetAllTopics() {
// sending response before request, as getTopicMetadata is a blocking call
- client.prepareResponse(newMetadataResponse(topicName, Errors.NONE).toStruct());
+ client.prepareResponse(newMetadataResponse(topicName, Errors.NONE));
Map<String, List<PartitionInfo>> allTopics = fetcher.getAllTopicMetadata(5000L);
@@ -553,7 +552,7 @@ public class FetcherTest {
public void testGetAllTopicsDisconnect() {
// first try gets a disconnect, next succeeds
client.prepareResponse(null, true);
- client.prepareResponse(newMetadataResponse(topicName, Errors.NONE).toStruct());
+ client.prepareResponse(newMetadataResponse(topicName, Errors.NONE));
Map<String, List<PartitionInfo>> allTopics = fetcher.getAllTopicMetadata(5000L);
assertEquals(cluster.topics().size(), allTopics.size());
}
@@ -566,7 +565,7 @@ public class FetcherTest {
@Test
public void testGetAllTopicsUnauthorized() {
- client.prepareResponse(newMetadataResponse(topicName, Errors.TOPIC_AUTHORIZATION_FAILED).toStruct());
+ client.prepareResponse(newMetadataResponse(topicName, Errors.TOPIC_AUTHORIZATION_FAILED));
try {
fetcher.getAllTopicMetadata(10L);
fail();
@@ -577,13 +576,13 @@ public class FetcherTest {
@Test(expected = InvalidTopicException.class)
public void testGetTopicMetadataInvalidTopic() {
- client.prepareResponse(newMetadataResponse(topicName, Errors.INVALID_TOPIC_EXCEPTION).toStruct());
+ client.prepareResponse(newMetadataResponse(topicName, Errors.INVALID_TOPIC_EXCEPTION));
fetcher.getTopicMetadata(new MetadataRequest(Collections.singletonList(topicName)), 5000L);
}
@Test
public void testGetTopicMetadataUnknownTopic() {
- client.prepareResponse(newMetadataResponse(topicName, Errors.UNKNOWN_TOPIC_OR_PARTITION).toStruct());
+ client.prepareResponse(newMetadataResponse(topicName, Errors.UNKNOWN_TOPIC_OR_PARTITION));
Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata(new MetadataRequest(Collections.singletonList(topicName)), 5000L);
assertNull(topicMetadata.get(topicName));
@@ -591,8 +590,8 @@ public class FetcherTest {
@Test
public void testGetTopicMetadataLeaderNotAvailable() {
- client.prepareResponse(newMetadataResponse(topicName, Errors.LEADER_NOT_AVAILABLE).toStruct());
- client.prepareResponse(newMetadataResponse(topicName, Errors.NONE).toStruct());
+ client.prepareResponse(newMetadataResponse(topicName, Errors.LEADER_NOT_AVAILABLE));
+ client.prepareResponse(newMetadataResponse(topicName, Errors.NONE));
Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata(new MetadataRequest(Collections.singletonList(topicName)), 5000L);
assertTrue(topicMetadata.containsKey(topicName));
@@ -705,28 +704,27 @@ public class FetcherTest {
// matches any list offset request with the provided timestamp
return new MockClient.RequestMatcher() {
@Override
- public boolean matches(ClientRequest request) {
- ListOffsetRequest req = new ListOffsetRequest(request.request().body());
+ public boolean matches(AbstractRequest body) {
+ ListOffsetRequest req = (ListOffsetRequest) body;
return timestamp == req.partitionTimestamps().get(tp);
}
};
}
- private Struct listOffsetResponse(Errors error, long timestamp, long offset) {
+ private ListOffsetResponse listOffsetResponse(Errors error, long timestamp, long offset) {
return listOffsetResponse(tp, error, timestamp, offset);
}
- private Struct listOffsetResponse(TopicPartition tp, Errors error, long timestamp, long offset) {
+ private ListOffsetResponse listOffsetResponse(TopicPartition tp, Errors error, long timestamp, long offset) {
ListOffsetResponse.PartitionData partitionData = new ListOffsetResponse.PartitionData(error.code(), timestamp, offset);
Map<TopicPartition, ListOffsetResponse.PartitionData> allPartitionData = new HashMap<>();
allPartitionData.put(tp, partitionData);
- ListOffsetResponse response = new ListOffsetResponse(allPartitionData, 1);
- return response.toStruct();
+ return new ListOffsetResponse(allPartitionData, 1);
}
- private Struct fetchResponse(ByteBuffer buffer, short error, long hw, int throttleTime) {
- FetchResponse response = new FetchResponse(Collections.singletonMap(tp, new FetchResponse.PartitionData(error, hw, buffer)), throttleTime);
- return response.toStruct();
+ private FetchResponse fetchResponse(ByteBuffer buffer, short error, long hw, int throttleTime) {
+ MemoryRecords records = MemoryRecords.readableRecords(buffer);
+ return new FetchResponse(Collections.singletonMap(tp, new FetchResponse.PartitionData(error, hw, records)), throttleTime);
}
private MetadataResponse newMetadataResponse(String topic, Errors error) {