You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/08/27 00:16:43 UTC

kafka git commit: KAFKA-2072: Add StopReplica request/response to o.a.k.common.requests

Repository: kafka
Updated Branches:
  refs/heads/trunk 8b538d62b -> 03f850f67


KAFKA-2072: Add StopReplica request/response to o.a.k.common.requests

This PR adds StopReplica request and response as it is required by ijuma for KAFKA-2411. Migration of core module is addressed a separate PR (#141).

ijuma Could you review it? gwenshap Could you take a look as well?

Author: David Jacot <da...@gmail.com>

Reviewers: Ismael Juma <is...@juma.me.uk>, Jun Rao <ju...@gmail.com>

Closes #170 from dajac/KAFKA-2072-part-1


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/03f850f6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/03f850f6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/03f850f6

Branch: refs/heads/trunk
Commit: 03f850f671ba3035eb150fa243dca0a2aed28302
Parents: 8b538d6
Author: David Jacot <da...@gmail.com>
Authored: Wed Aug 26 17:16:30 2015 -0500
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Aug 26 17:16:30 2015 -0500

----------------------------------------------------------------------
 .../apache/kafka/common/protocol/Protocol.java  |  36 ++++--
 .../kafka/common/requests/AbstractRequest.java  |   4 +-
 .../common/requests/StopReplicaRequest.java     | 120 +++++++++++++++++++
 .../common/requests/StopReplicaResponse.java    | 101 ++++++++++++++++
 .../common/requests/RequestResponseTest.java    |  18 ++-
 5 files changed, 269 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/03f850f6/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 048d761..a951e90 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -16,16 +16,12 @@
  */
 package org.apache.kafka.common.protocol;
 
-import static org.apache.kafka.common.protocol.types.Type.BYTES;
-import static org.apache.kafka.common.protocol.types.Type.INT16;
-import static org.apache.kafka.common.protocol.types.Type.INT32;
-import static org.apache.kafka.common.protocol.types.Type.INT64;
-import static org.apache.kafka.common.protocol.types.Type.STRING;
-
 import org.apache.kafka.common.protocol.types.ArrayOf;
 import org.apache.kafka.common.protocol.types.Field;
 import org.apache.kafka.common.protocol.types.Schema;
 
+import static org.apache.kafka.common.protocol.types.Type.*;
+
 public class Protocol {
 
     public static final Schema REQUEST_HEADER = new Schema(new Field("api_key", INT16, "The id of the request type."),
@@ -446,6 +442,29 @@ public class Protocol {
     public static final Schema[] HEARTBEAT_REQUEST = new Schema[] {HEARTBEAT_REQUEST_V0};
     public static final Schema[] HEARTBEAT_RESPONSE = new Schema[] {HEARTBEAT_RESPONSE_V0};
 
+    /* Replica api */
+    public static final Schema STOP_REPLICA_REQUEST_PARTITION_V0 = new Schema(new Field("topic", STRING, "Topic name."),
+                                                                              new Field("partition", INT32, "Topic partition id."));
+
+    public static final Schema STOP_REPLICA_REQUEST_V0 = new Schema(new Field("controller_id", INT32, "The controller id."),
+                                                                    new Field("controller_epoch", INT32, "The controller epoch."),
+                                                                    new Field("delete_partitions",
+                                                                              INT8,
+                                                                              "Boolean which indicates if replica's partitions must be deleted."),
+                                                                    new Field("partitions",
+                                                                              new ArrayOf(STOP_REPLICA_REQUEST_PARTITION_V0)));
+
+    public static final Schema STOP_REPLICA_RESPONSE_PARTITION_V0 = new Schema(new Field("topic", STRING, "Topic name."),
+                                                                               new Field("partition", INT32, "Topic partition id."),
+                                                                               new Field("error_code", INT16, "Error code."));
+
+    public static final Schema STOP_REPLICA_RESPONSE_V0 = new Schema(new Field("error_code", INT16, "Error code."),
+                                                                     new Field("partitions",
+                                                                               new ArrayOf(STOP_REPLICA_RESPONSE_PARTITION_V0)));
+
+    public static final Schema[] STOP_REPLICA_REQUEST = new Schema[] {STOP_REPLICA_REQUEST_V0};
+    public static final Schema[] STOP_REPLICA_RESPONSE = new Schema[] {STOP_REPLICA_RESPONSE_V0};
+
     /* an array of all requests and responses with all schema versions */
     public static final Schema[][] REQUESTS = new Schema[ApiKeys.MAX_API_KEY + 1][];
     public static final Schema[][] RESPONSES = new Schema[ApiKeys.MAX_API_KEY + 1][];
@@ -459,7 +478,7 @@ public class Protocol {
         REQUESTS[ApiKeys.LIST_OFFSETS.id] = LIST_OFFSET_REQUEST;
         REQUESTS[ApiKeys.METADATA.id] = METADATA_REQUEST;
         REQUESTS[ApiKeys.LEADER_AND_ISR.id] = new Schema[] {};
-        REQUESTS[ApiKeys.STOP_REPLICA.id] = new Schema[] {};
+        REQUESTS[ApiKeys.STOP_REPLICA.id] = STOP_REPLICA_REQUEST;
         REQUESTS[ApiKeys.UPDATE_METADATA_KEY.id] = new Schema[] {};
         REQUESTS[ApiKeys.CONTROLLED_SHUTDOWN_KEY.id] = new Schema[] {};
         REQUESTS[ApiKeys.OFFSET_COMMIT.id] = OFFSET_COMMIT_REQUEST;
@@ -468,12 +487,13 @@ public class Protocol {
         REQUESTS[ApiKeys.JOIN_GROUP.id] = JOIN_GROUP_REQUEST;
         REQUESTS[ApiKeys.HEARTBEAT.id] = HEARTBEAT_REQUEST;
 
+
         RESPONSES[ApiKeys.PRODUCE.id] = PRODUCE_RESPONSE;
         RESPONSES[ApiKeys.FETCH.id] = FETCH_RESPONSE;
         RESPONSES[ApiKeys.LIST_OFFSETS.id] = LIST_OFFSET_RESPONSE;
         RESPONSES[ApiKeys.METADATA.id] = METADATA_RESPONSE;
         RESPONSES[ApiKeys.LEADER_AND_ISR.id] = new Schema[] {};
-        RESPONSES[ApiKeys.STOP_REPLICA.id] = new Schema[] {};
+        RESPONSES[ApiKeys.STOP_REPLICA.id] = STOP_REPLICA_RESPONSE;
         RESPONSES[ApiKeys.UPDATE_METADATA_KEY.id] = new Schema[] {};
         RESPONSES[ApiKeys.CONTROLLED_SHUTDOWN_KEY.id] = new Schema[] {};
         RESPONSES[ApiKeys.OFFSET_COMMIT.id] = OFFSET_COMMIT_RESPONSE;

http://git-wip-us.apache.org/repos/asf/kafka/blob/03f850f6/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index 5d3d528..e316957 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -55,8 +55,10 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
                 return JoinGroupRequest.parse(buffer, versionId);
             case HEARTBEAT:
                 return HeartbeatRequest.parse(buffer, versionId);
+            case STOP_REPLICA:
+                return StopReplicaRequest.parse(buffer, versionId);
             default:
                 return null;
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/03f850f6/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
new file mode 100644
index 0000000..85ac394
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+public class StopReplicaRequest extends AbstractRequest {
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.STOP_REPLICA.id);
+
+    private static final String CONTROLLER_ID_KEY_NAME = "controller_id";
+    private static final String CONTROLLER_EPOCH_KEY_NAME = "controller_epoch";
+    private static final String DELETE_PARTITIONS_KEY_NAME = "delete_partitions";
+    private static final String PARTITIONS_KEY_NAME = "partitions";
+
+    private static final String TOPIC_KEY_NAME = "topic";
+    private static final String PARTITION_KEY_NAME = "partition";
+
+    private final int controllerId;
+    private final int controllerEpoch;
+    private final boolean deletePartitions;
+    private final Set<TopicPartition> partitions;
+
+    public StopReplicaRequest(int controllerId, int controllerEpoch, boolean deletePartitions, Set<TopicPartition> partitions) {
+        super(new Struct(CURRENT_SCHEMA));
+
+        struct.set(CONTROLLER_ID_KEY_NAME, controllerId);
+        struct.set(CONTROLLER_EPOCH_KEY_NAME, controllerEpoch);
+        struct.set(DELETE_PARTITIONS_KEY_NAME, deletePartitions ? (byte) 1 : (byte) 0);
+
+        List<Struct> partitionDatas = new ArrayList<>(partitions.size());
+        for (TopicPartition partition : partitions) {
+            Struct partitionData = struct.instance(PARTITIONS_KEY_NAME);
+            partitionData.set(TOPIC_KEY_NAME, partition.topic());
+            partitionData.set(PARTITION_KEY_NAME, partition.partition());
+            partitionDatas.add(partitionData);
+        }
+
+        struct.set(PARTITIONS_KEY_NAME, partitionDatas.toArray());
+
+        this.controllerId = controllerId;
+        this.controllerEpoch = controllerEpoch;
+        this.deletePartitions = deletePartitions;
+        this.partitions = partitions;
+    }
+
+    public StopReplicaRequest(Struct struct) {
+        super(struct);
+
+        partitions = new HashSet<>();
+        for (Object partitionDataObj : struct.getArray(PARTITIONS_KEY_NAME)) {
+            Struct partitionData = (Struct) partitionDataObj;
+            String topic = partitionData.getString(TOPIC_KEY_NAME);
+            int partition = partitionData.getInt(PARTITION_KEY_NAME);
+            partitions.add(new TopicPartition(topic, partition));
+        }
+
+        controllerId = struct.getInt(CONTROLLER_ID_KEY_NAME);
+        controllerEpoch = struct.getInt(CONTROLLER_EPOCH_KEY_NAME);
+        deletePartitions = ((byte) struct.get(DELETE_PARTITIONS_KEY_NAME)) != 0;
+    }
+
+    @Override
+    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+        Map<TopicPartition, Short> responses = new HashMap<>(partitions.size());
+        for (TopicPartition partition : partitions) {
+            responses.put(partition, Errors.forException(e).code());
+        }
+
+        switch (versionId) {
+            case 0:
+                return new StopReplicaResponse(Errors.NONE.code(), responses);
+            default:
+                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+                        versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.STOP_REPLICA.id)));
+        }
+    }
+
+    public int controllerId() {
+        return controllerId;
+    }
+
+    public int controllerEpoch() {
+        return controllerEpoch;
+    }
+
+    public boolean deletePartitions() {
+        return deletePartitions;
+    }
+
+    public Set<TopicPartition> partitions() {
+        return partitions;
+    }
+
+    public static StopReplicaRequest parse(ByteBuffer buffer, int versionId) {
+        return new StopReplicaRequest(ProtoUtils.parseRequest(ApiKeys.STOP_REPLICA.id, versionId, buffer));
+    }
+
+    public static StopReplicaRequest parse(ByteBuffer buffer) {
+        return new StopReplicaRequest((Struct) CURRENT_SCHEMA.read(buffer));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/03f850f6/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java
new file mode 100644
index 0000000..4fa1cac
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+public class StopReplicaResponse extends AbstractRequestResponse {
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.STOP_REPLICA.id);
+
+    private static final String ERROR_CODE_KEY_NAME = "error_code";
+    private static final String PARTITIONS_KEY_NAME = "partitions";
+
+    private static final String PARTITIONS_TOPIC_KEY_NAME = "topic";
+    private static final String PARTITIONS_PARTITION_KEY_NAME = "partition";
+    private static final String PARTITIONS_ERROR_CODE_KEY_NAME = "error_code";
+
+    private final Map<TopicPartition, Short> responses;
+    private final short errorCode;
+
+    /**
+     * Possible error code:
+     *
+     * STALE_CONTROLLER_EPOCH (11)
+     */
+
+    public StopReplicaResponse(Map<TopicPartition, Short> responses) {
+        this(Errors.NONE.code(), responses);
+    }
+
+    public StopReplicaResponse(short errorCode, Map<TopicPartition, Short> responses) {
+        super(new Struct(CURRENT_SCHEMA));
+
+        struct.set(ERROR_CODE_KEY_NAME, errorCode);
+
+        List<Struct> responseDatas = new ArrayList<>(responses.size());
+        for (Map.Entry<TopicPartition, Short> response : responses.entrySet()) {
+            Struct partitionData = struct.instance(PARTITIONS_KEY_NAME);
+            TopicPartition partition = response.getKey();
+            partitionData.set(PARTITIONS_TOPIC_KEY_NAME, partition.topic());
+            partitionData.set(PARTITIONS_PARTITION_KEY_NAME, partition.partition());
+            partitionData.set(PARTITIONS_ERROR_CODE_KEY_NAME, response.getValue());
+            responseDatas.add(partitionData);
+        }
+
+        struct.set(PARTITIONS_KEY_NAME, responseDatas.toArray());
+        struct.set(ERROR_CODE_KEY_NAME, errorCode);
+
+        this.responses = responses;
+        this.errorCode = errorCode;
+    }
+
+    public StopReplicaResponse(Struct struct) {
+        super(struct);
+
+        responses = new HashMap<>();
+        for (Object responseDataObj : struct.getArray(PARTITIONS_KEY_NAME)) {
+            Struct responseData = (Struct) responseDataObj;
+            String topic = responseData.getString(PARTITIONS_TOPIC_KEY_NAME);
+            int partition = responseData.getInt(PARTITIONS_PARTITION_KEY_NAME);
+            short errorCode = responseData.getShort(PARTITIONS_ERROR_CODE_KEY_NAME);
+            responses.put(new TopicPartition(topic, partition), errorCode);
+        }
+
+        errorCode = struct.getShort(ERROR_CODE_KEY_NAME);
+    }
+
+    public Map<TopicPartition, Short> responses() {
+        return responses;
+    }
+
+    public short errorCode() {
+        return errorCode;
+    }
+
+    public static StopReplicaResponse parse(ByteBuffer buffer, int versionId) {
+        return new StopReplicaResponse(ProtoUtils.parseRequest(ApiKeys.STOP_REPLICA.id, versionId, buffer));
+    }
+
+    public static StopReplicaResponse parse(ByteBuffer buffer) {
+        return new StopReplicaResponse((Struct) CURRENT_SCHEMA.read(buffer));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/03f850f6/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 9e92da6..353d621 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -29,6 +29,8 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.HashSet;
+import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
 
@@ -65,7 +67,10 @@ public class RequestResponseTest {
                 createOffsetFetchResponse(),
                 createProduceRequest(),
                 createProduceRequest().getErrorResponse(0, new UnknownServerException()),
-                createProduceResponse());
+                createProduceResponse(),
+                createStopReplicaRequest(),
+                createStopReplicaRequest().getErrorResponse(0, new UnknownServerException()),
+                createStopReplicaResponse());
 
         for (AbstractRequestResponse req: requestResponseList) {
             ByteBuffer buffer = ByteBuffer.allocate(req.sizeOf());
@@ -217,4 +222,15 @@ public class RequestResponseTest {
         responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse(Errors.NONE.code(), 10000));
         return new ProduceResponse(responseData, 0);
     }
+
+    private AbstractRequest createStopReplicaRequest() {
+        Set<TopicPartition> partitions = new HashSet<>(Arrays.asList(new TopicPartition("test", 0)));
+        return new StopReplicaRequest(0, 1, true, partitions);
+    }
+
+    private AbstractRequestResponse createStopReplicaResponse() {
+        Map<TopicPartition, Short> responses = new HashMap<>();
+        responses.put(new TopicPartition("test", 0), Errors.NONE.code());
+        return new StopReplicaResponse(Errors.NONE.code(), responses);
+    }
 }