You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/08/27 00:16:43 UTC
kafka git commit: KAFKA-2072: Add StopReplica request/response to
o.a.k.common.requests
Repository: kafka
Updated Branches:
refs/heads/trunk 8b538d62b -> 03f850f67
KAFKA-2072: Add StopReplica request/response to o.a.k.common.requests
This PR adds StopReplica request and response as it is required by ijuma for KAFKA-2411. Migration of core module is addressed a separate PR (#141).
ijuma Could you review it? gwenshap Could you take a look as well?
Author: David Jacot <da...@gmail.com>
Reviewers: Ismael Juma <is...@juma.me.uk>, Jun Rao <ju...@gmail.com>
Closes #170 from dajac/KAFKA-2072-part-1
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/03f850f6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/03f850f6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/03f850f6
Branch: refs/heads/trunk
Commit: 03f850f671ba3035eb150fa243dca0a2aed28302
Parents: 8b538d6
Author: David Jacot <da...@gmail.com>
Authored: Wed Aug 26 17:16:30 2015 -0500
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Aug 26 17:16:30 2015 -0500
----------------------------------------------------------------------
.../apache/kafka/common/protocol/Protocol.java | 36 ++++--
.../kafka/common/requests/AbstractRequest.java | 4 +-
.../common/requests/StopReplicaRequest.java | 120 +++++++++++++++++++
.../common/requests/StopReplicaResponse.java | 101 ++++++++++++++++
.../common/requests/RequestResponseTest.java | 18 ++-
5 files changed, 269 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/03f850f6/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 048d761..a951e90 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -16,16 +16,12 @@
*/
package org.apache.kafka.common.protocol;
-import static org.apache.kafka.common.protocol.types.Type.BYTES;
-import static org.apache.kafka.common.protocol.types.Type.INT16;
-import static org.apache.kafka.common.protocol.types.Type.INT32;
-import static org.apache.kafka.common.protocol.types.Type.INT64;
-import static org.apache.kafka.common.protocol.types.Type.STRING;
-
import org.apache.kafka.common.protocol.types.ArrayOf;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
+import static org.apache.kafka.common.protocol.types.Type.*;
+
public class Protocol {
public static final Schema REQUEST_HEADER = new Schema(new Field("api_key", INT16, "The id of the request type."),
@@ -446,6 +442,29 @@ public class Protocol {
public static final Schema[] HEARTBEAT_REQUEST = new Schema[] {HEARTBEAT_REQUEST_V0};
public static final Schema[] HEARTBEAT_RESPONSE = new Schema[] {HEARTBEAT_RESPONSE_V0};
+ /* Replica api */
+ public static final Schema STOP_REPLICA_REQUEST_PARTITION_V0 = new Schema(new Field("topic", STRING, "Topic name."),
+ new Field("partition", INT32, "Topic partition id."));
+
+ public static final Schema STOP_REPLICA_REQUEST_V0 = new Schema(new Field("controller_id", INT32, "The controller id."),
+ new Field("controller_epoch", INT32, "The controller epoch."),
+ new Field("delete_partitions",
+ INT8,
+ "Boolean which indicates if replica's partitions must be deleted."),
+ new Field("partitions",
+ new ArrayOf(STOP_REPLICA_REQUEST_PARTITION_V0)));
+
+ public static final Schema STOP_REPLICA_RESPONSE_PARTITION_V0 = new Schema(new Field("topic", STRING, "Topic name."),
+ new Field("partition", INT32, "Topic partition id."),
+ new Field("error_code", INT16, "Error code."));
+
+ public static final Schema STOP_REPLICA_RESPONSE_V0 = new Schema(new Field("error_code", INT16, "Error code."),
+ new Field("partitions",
+ new ArrayOf(STOP_REPLICA_RESPONSE_PARTITION_V0)));
+
+ public static final Schema[] STOP_REPLICA_REQUEST = new Schema[] {STOP_REPLICA_REQUEST_V0};
+ public static final Schema[] STOP_REPLICA_RESPONSE = new Schema[] {STOP_REPLICA_RESPONSE_V0};
+
/* an array of all requests and responses with all schema versions */
public static final Schema[][] REQUESTS = new Schema[ApiKeys.MAX_API_KEY + 1][];
public static final Schema[][] RESPONSES = new Schema[ApiKeys.MAX_API_KEY + 1][];
@@ -459,7 +478,7 @@ public class Protocol {
REQUESTS[ApiKeys.LIST_OFFSETS.id] = LIST_OFFSET_REQUEST;
REQUESTS[ApiKeys.METADATA.id] = METADATA_REQUEST;
REQUESTS[ApiKeys.LEADER_AND_ISR.id] = new Schema[] {};
- REQUESTS[ApiKeys.STOP_REPLICA.id] = new Schema[] {};
+ REQUESTS[ApiKeys.STOP_REPLICA.id] = STOP_REPLICA_REQUEST;
REQUESTS[ApiKeys.UPDATE_METADATA_KEY.id] = new Schema[] {};
REQUESTS[ApiKeys.CONTROLLED_SHUTDOWN_KEY.id] = new Schema[] {};
REQUESTS[ApiKeys.OFFSET_COMMIT.id] = OFFSET_COMMIT_REQUEST;
@@ -468,12 +487,13 @@ public class Protocol {
REQUESTS[ApiKeys.JOIN_GROUP.id] = JOIN_GROUP_REQUEST;
REQUESTS[ApiKeys.HEARTBEAT.id] = HEARTBEAT_REQUEST;
+
RESPONSES[ApiKeys.PRODUCE.id] = PRODUCE_RESPONSE;
RESPONSES[ApiKeys.FETCH.id] = FETCH_RESPONSE;
RESPONSES[ApiKeys.LIST_OFFSETS.id] = LIST_OFFSET_RESPONSE;
RESPONSES[ApiKeys.METADATA.id] = METADATA_RESPONSE;
RESPONSES[ApiKeys.LEADER_AND_ISR.id] = new Schema[] {};
- RESPONSES[ApiKeys.STOP_REPLICA.id] = new Schema[] {};
+ RESPONSES[ApiKeys.STOP_REPLICA.id] = STOP_REPLICA_RESPONSE;
RESPONSES[ApiKeys.UPDATE_METADATA_KEY.id] = new Schema[] {};
RESPONSES[ApiKeys.CONTROLLED_SHUTDOWN_KEY.id] = new Schema[] {};
RESPONSES[ApiKeys.OFFSET_COMMIT.id] = OFFSET_COMMIT_RESPONSE;
http://git-wip-us.apache.org/repos/asf/kafka/blob/03f850f6/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index 5d3d528..e316957 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -55,8 +55,10 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
return JoinGroupRequest.parse(buffer, versionId);
case HEARTBEAT:
return HeartbeatRequest.parse(buffer, versionId);
+ case STOP_REPLICA:
+ return StopReplicaRequest.parse(buffer, versionId);
default:
return null;
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/03f850f6/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
new file mode 100644
index 0000000..85ac394
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+public class StopReplicaRequest extends AbstractRequest {
+ private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.STOP_REPLICA.id);
+
+ private static final String CONTROLLER_ID_KEY_NAME = "controller_id";
+ private static final String CONTROLLER_EPOCH_KEY_NAME = "controller_epoch";
+ private static final String DELETE_PARTITIONS_KEY_NAME = "delete_partitions";
+ private static final String PARTITIONS_KEY_NAME = "partitions";
+
+ private static final String TOPIC_KEY_NAME = "topic";
+ private static final String PARTITION_KEY_NAME = "partition";
+
+ private final int controllerId;
+ private final int controllerEpoch;
+ private final boolean deletePartitions;
+ private final Set<TopicPartition> partitions;
+
+ public StopReplicaRequest(int controllerId, int controllerEpoch, boolean deletePartitions, Set<TopicPartition> partitions) {
+ super(new Struct(CURRENT_SCHEMA));
+
+ struct.set(CONTROLLER_ID_KEY_NAME, controllerId);
+ struct.set(CONTROLLER_EPOCH_KEY_NAME, controllerEpoch);
+ struct.set(DELETE_PARTITIONS_KEY_NAME, deletePartitions ? (byte) 1 : (byte) 0);
+
+ List<Struct> partitionDatas = new ArrayList<>(partitions.size());
+ for (TopicPartition partition : partitions) {
+ Struct partitionData = struct.instance(PARTITIONS_KEY_NAME);
+ partitionData.set(TOPIC_KEY_NAME, partition.topic());
+ partitionData.set(PARTITION_KEY_NAME, partition.partition());
+ partitionDatas.add(partitionData);
+ }
+
+ struct.set(PARTITIONS_KEY_NAME, partitionDatas.toArray());
+
+ this.controllerId = controllerId;
+ this.controllerEpoch = controllerEpoch;
+ this.deletePartitions = deletePartitions;
+ this.partitions = partitions;
+ }
+
+ public StopReplicaRequest(Struct struct) {
+ super(struct);
+
+ partitions = new HashSet<>();
+ for (Object partitionDataObj : struct.getArray(PARTITIONS_KEY_NAME)) {
+ Struct partitionData = (Struct) partitionDataObj;
+ String topic = partitionData.getString(TOPIC_KEY_NAME);
+ int partition = partitionData.getInt(PARTITION_KEY_NAME);
+ partitions.add(new TopicPartition(topic, partition));
+ }
+
+ controllerId = struct.getInt(CONTROLLER_ID_KEY_NAME);
+ controllerEpoch = struct.getInt(CONTROLLER_EPOCH_KEY_NAME);
+ deletePartitions = ((byte) struct.get(DELETE_PARTITIONS_KEY_NAME)) != 0;
+ }
+
+ @Override
+ public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+ Map<TopicPartition, Short> responses = new HashMap<>(partitions.size());
+ for (TopicPartition partition : partitions) {
+ responses.put(partition, Errors.forException(e).code());
+ }
+
+ switch (versionId) {
+ case 0:
+ return new StopReplicaResponse(Errors.NONE.code(), responses);
+ default:
+ throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+ versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.STOP_REPLICA.id)));
+ }
+ }
+
+ public int controllerId() {
+ return controllerId;
+ }
+
+ public int controllerEpoch() {
+ return controllerEpoch;
+ }
+
+ public boolean deletePartitions() {
+ return deletePartitions;
+ }
+
+ public Set<TopicPartition> partitions() {
+ return partitions;
+ }
+
+ public static StopReplicaRequest parse(ByteBuffer buffer, int versionId) {
+ return new StopReplicaRequest(ProtoUtils.parseRequest(ApiKeys.STOP_REPLICA.id, versionId, buffer));
+ }
+
+ public static StopReplicaRequest parse(ByteBuffer buffer) {
+ return new StopReplicaRequest((Struct) CURRENT_SCHEMA.read(buffer));
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/03f850f6/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java
new file mode 100644
index 0000000..4fa1cac
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+public class StopReplicaResponse extends AbstractRequestResponse {
+ private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.STOP_REPLICA.id);
+
+ private static final String ERROR_CODE_KEY_NAME = "error_code";
+ private static final String PARTITIONS_KEY_NAME = "partitions";
+
+ private static final String PARTITIONS_TOPIC_KEY_NAME = "topic";
+ private static final String PARTITIONS_PARTITION_KEY_NAME = "partition";
+ private static final String PARTITIONS_ERROR_CODE_KEY_NAME = "error_code";
+
+ private final Map<TopicPartition, Short> responses;
+ private final short errorCode;
+
+ /**
+ * Possible error code:
+ *
+ * STALE_CONTROLLER_EPOCH (11)
+ */
+
+ public StopReplicaResponse(Map<TopicPartition, Short> responses) {
+ this(Errors.NONE.code(), responses);
+ }
+
+ public StopReplicaResponse(short errorCode, Map<TopicPartition, Short> responses) {
+ super(new Struct(CURRENT_SCHEMA));
+
+ struct.set(ERROR_CODE_KEY_NAME, errorCode);
+
+ List<Struct> responseDatas = new ArrayList<>(responses.size());
+ for (Map.Entry<TopicPartition, Short> response : responses.entrySet()) {
+ Struct partitionData = struct.instance(PARTITIONS_KEY_NAME);
+ TopicPartition partition = response.getKey();
+ partitionData.set(PARTITIONS_TOPIC_KEY_NAME, partition.topic());
+ partitionData.set(PARTITIONS_PARTITION_KEY_NAME, partition.partition());
+ partitionData.set(PARTITIONS_ERROR_CODE_KEY_NAME, response.getValue());
+ responseDatas.add(partitionData);
+ }
+
+ struct.set(PARTITIONS_KEY_NAME, responseDatas.toArray());
+ struct.set(ERROR_CODE_KEY_NAME, errorCode);
+
+ this.responses = responses;
+ this.errorCode = errorCode;
+ }
+
+ public StopReplicaResponse(Struct struct) {
+ super(struct);
+
+ responses = new HashMap<>();
+ for (Object responseDataObj : struct.getArray(PARTITIONS_KEY_NAME)) {
+ Struct responseData = (Struct) responseDataObj;
+ String topic = responseData.getString(PARTITIONS_TOPIC_KEY_NAME);
+ int partition = responseData.getInt(PARTITIONS_PARTITION_KEY_NAME);
+ short errorCode = responseData.getShort(PARTITIONS_ERROR_CODE_KEY_NAME);
+ responses.put(new TopicPartition(topic, partition), errorCode);
+ }
+
+ errorCode = struct.getShort(ERROR_CODE_KEY_NAME);
+ }
+
+ public Map<TopicPartition, Short> responses() {
+ return responses;
+ }
+
+ public short errorCode() {
+ return errorCode;
+ }
+
+ public static StopReplicaResponse parse(ByteBuffer buffer, int versionId) {
+ return new StopReplicaResponse(ProtoUtils.parseRequest(ApiKeys.STOP_REPLICA.id, versionId, buffer));
+ }
+
+ public static StopReplicaResponse parse(ByteBuffer buffer) {
+ return new StopReplicaResponse((Struct) CURRENT_SCHEMA.read(buffer));
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/03f850f6/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 9e92da6..353d621 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -29,6 +29,8 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.HashSet;
+import java.util.Set;
import static org.junit.Assert.assertEquals;
@@ -65,7 +67,10 @@ public class RequestResponseTest {
createOffsetFetchResponse(),
createProduceRequest(),
createProduceRequest().getErrorResponse(0, new UnknownServerException()),
- createProduceResponse());
+ createProduceResponse(),
+ createStopReplicaRequest(),
+ createStopReplicaRequest().getErrorResponse(0, new UnknownServerException()),
+ createStopReplicaResponse());
for (AbstractRequestResponse req: requestResponseList) {
ByteBuffer buffer = ByteBuffer.allocate(req.sizeOf());
@@ -217,4 +222,15 @@ public class RequestResponseTest {
responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse(Errors.NONE.code(), 10000));
return new ProduceResponse(responseData, 0);
}
+
+ private AbstractRequest createStopReplicaRequest() {
+ Set<TopicPartition> partitions = new HashSet<>(Arrays.asList(new TopicPartition("test", 0)));
+ return new StopReplicaRequest(0, 1, true, partitions);
+ }
+
+ private AbstractRequestResponse createStopReplicaResponse() {
+ Map<TopicPartition, Short> responses = new HashMap<>();
+ responses.put(new TopicPartition("test", 0), Errors.NONE.code());
+ return new StopReplicaResponse(Errors.NONE.code(), responses);
+ }
}