You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/11/16 17:52:41 UTC

kafka git commit: KAFKA-5925: Adding records deletion operation to the new Admin Client API

Repository: kafka
Updated Branches:
  refs/heads/trunk 539c4d53f -> 53c5ccb33


KAFKA-5925: Adding records deletion operation to the new Admin Client API

This is the PR related to the [KIP-204](https://cwiki.apache.org/confluence/display/KAFKA/KIP-204+%3A+Adding+records+deletion+operation+to+the+new+Admin+Client+API) in order to add the `deleteRecords` operation to the new Admin Client (it's already available in the "legacy" one).
Other than that, unit test and integration tests are added as well (such integration tests come from the "legacy" integration tests in order to test the new addition in the same way as the "legacy" one).

Author: Paolo Patierno <pp...@live.com>

Reviewers: Colin P. Mccabe <cm...@confluent.io>, Guozhang Wang <wa...@gmail.com>

Closes #4132 from ppatierno/kafka-5925


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/53c5ccb3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/53c5ccb3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/53c5ccb3

Branch: refs/heads/trunk
Commit: 53c5ccb33401b772678e9d1ef3bebd5784e22814
Parents: 539c4d5
Author: Paolo Patierno <pp...@live.com>
Authored: Thu Nov 16 09:52:33 2017 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Nov 16 09:52:33 2017 -0800

----------------------------------------------------------------------
 .../apache/kafka/clients/admin/AdminClient.java |  27 ++++
 .../clients/admin/DeleteRecordsOptions.java     |  32 ++++
 .../clients/admin/DeleteRecordsResult.java      |  54 +++++++
 .../kafka/clients/admin/DeletedRecords.java     |  47 ++++++
 .../kafka/clients/admin/KafkaAdminClient.java   | 110 +++++++++++++
 .../kafka/clients/admin/RecordsToDelete.java    |  58 +++++++
 .../clients/admin/KafkaAdminClientTest.java     | 115 ++++++++++++++
 .../kafka/api/AdminClientIntegrationTest.scala  | 154 +++++++++++++++++--
 8 files changed, 587 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/53c5ccb3/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
index 3f56399..abe666a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.clients.admin;
 
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.TopicPartitionReplica;
 import org.apache.kafka.common.acl.AclBinding;
 import org.apache.kafka.common.acl.AclBindingFilter;
@@ -508,4 +509,30 @@ public abstract class AdminClient implements AutoCloseable {
     public abstract CreatePartitionsResult createPartitions(Map<String, NewPartitions> newPartitions,
                                                             CreatePartitionsOptions options);
 
+    /**
+     * Delete records whose offset is smaller than the given offset of the corresponding partition.
+     *
+     * This is a convenience method for {@link #deleteRecords(Map, DeleteRecordsOptions)} with default options.
+     * See the overload for more details.
+     *
+     * This operation is supported by brokers with version 0.11.0.0 or higher.
+     *
+     * @param recordsToDelete       The topic partitions and related offsets from which records deletion starts.
+     * @return                      The DeleteRecordsResult.
+     */
+    public DeleteRecordsResult deleteRecords(Map<TopicPartition, RecordsToDelete> recordsToDelete) {
+        return deleteRecords(recordsToDelete, new DeleteRecordsOptions());
+    }
+
+    /**
+     * Delete records whose offset is smaller than the given offset of the corresponding partition.
+     *
+     * This operation is supported by brokers with version 0.11.0.0 or higher.
+     *
+     * @param recordsToDelete       The topic partitions and related offsets from which records deletion starts.
+     * @param options               The options to use when deleting records.
+     * @return                      The DeleteRecordsResult.
+     */
+    public abstract DeleteRecordsResult deleteRecords(Map<TopicPartition, RecordsToDelete> recordsToDelete,
+                                                      DeleteRecordsOptions options);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/53c5ccb3/clients/src/main/java/org/apache/kafka/clients/admin/DeleteRecordsOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteRecordsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteRecordsOptions.java
new file mode 100644
index 0000000..2581694
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteRecordsOptions.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+
+/**
+ * Options for {@link AdminClient#deleteRecords(Map, DeleteRecordsOptions)}.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class DeleteRecordsOptions extends AbstractOptions<DeleteRecordsOptions> {
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/53c5ccb3/clients/src/main/java/org/apache/kafka/clients/admin/DeleteRecordsResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteRecordsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteRecordsResult.java
new file mode 100644
index 0000000..44c5252
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteRecordsResult.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+
+/**
+ * The result of the {@link AdminClient#deleteRecords(Map)} call.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class DeleteRecordsResult {
+
+    private final Map<TopicPartition, KafkaFuture<DeletedRecords>> futures;
+
+    DeleteRecordsResult(Map<TopicPartition, KafkaFuture<DeletedRecords>> futures) {
+        this.futures = futures;
+    }
+
+    /**
+     * Return a map from topic partition to futures which can be used to check the status of
+     * individual deletions.
+     */
+    public Map<TopicPartition, KafkaFuture<DeletedRecords>> lowWatermarks() {
+        return futures;
+    }
+
+    /**
+     * Return a future which succeeds only if all the records deletions succeed.
+     */
+    public KafkaFuture<Void> all() {
+        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/53c5ccb3/clients/src/main/java/org/apache/kafka/clients/admin/DeletedRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeletedRecords.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeletedRecords.java
new file mode 100644
index 0000000..983ae28
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeletedRecords.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Represents information about deleted records
+ *
+ * The API for this class is still evolving and we may break compatibility in minor releases, if necessary.
+ */
+@InterfaceStability.Evolving
+public class DeletedRecords {
+
+    private final long lowWatermark;
+
+    /**
+     * Create an instance of this class with the provided parameters.
+     *
+     * @param lowWatermark  "low watermark" for the topic partition on which the deletion was executed
+     */
+    public DeletedRecords(long lowWatermark) {
+        this.lowWatermark = lowWatermark;
+    }
+
+    /**
+     * Return the "low watermark" for the topic partition on which the deletion was executed
+     */
+    public long lowWatermark() {
+        return lowWatermark;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/53c5ccb3/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 1945f50..cf48846 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -77,6 +77,8 @@ import org.apache.kafka.common.requests.DeleteAclsRequest;
 import org.apache.kafka.common.requests.DeleteAclsResponse;
 import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult;
 import org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse;
+import org.apache.kafka.common.requests.DeleteRecordsRequest;
+import org.apache.kafka.common.requests.DeleteRecordsResponse;
 import org.apache.kafka.common.requests.DeleteTopicsRequest;
 import org.apache.kafka.common.requests.DeleteTopicsResponse;
 import org.apache.kafka.common.requests.DescribeAclsRequest;
@@ -1871,4 +1873,112 @@ public class KafkaAdminClient extends AdminClient {
         return new CreatePartitionsResult(new HashMap<String, KafkaFuture<Void>>(futures));
     }
 
+    public DeleteRecordsResult deleteRecords(final Map<TopicPartition, RecordsToDelete> recordsToDelete,
+                                             final DeleteRecordsOptions options) {
+
+        // requests need to be sent to partitions leader nodes so ...
+        // ... from the provided map it's needed to create more maps grouping topic/partition per leader
+
+        final Map<TopicPartition, KafkaFutureImpl<DeletedRecords>> futures = new HashMap<>(recordsToDelete.size());
+        for (TopicPartition topicPartition: recordsToDelete.keySet()) {
+            futures.put(topicPartition, new KafkaFutureImpl<DeletedRecords>());
+        }
+
+        // preparing topics list for asking metadata about them
+        final Set<String> topics = new HashSet<>();
+        for (TopicPartition topicPartition: recordsToDelete.keySet()) {
+            topics.add(topicPartition.topic());
+        }
+
+        final long nowMetadata = time.milliseconds();
+        final long deadline = calcDeadlineMs(nowMetadata, options.timeoutMs());
+        // asking for topics metadata for getting partitions leaders
+        runnable.call(new Call("topicsMetadata", deadline,
+                new LeastLoadedNodeProvider()) {
+
+            @Override
+            AbstractRequest.Builder createRequest(int timeoutMs) {
+                return new MetadataRequest.Builder(new ArrayList<>(topics), false);
+            }
+
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                MetadataResponse response = (MetadataResponse) abstractResponse;
+
+                Map<String, Errors> errors = response.errors();
+                Cluster cluster = response.cluster();
+
+                // completing futures for topics with errors
+                for (Map.Entry<String, Errors> topicError: errors.entrySet()) {
+
+                    for (Map.Entry<TopicPartition, KafkaFutureImpl<DeletedRecords>> future: futures.entrySet()) {
+                        if (future.getKey().topic().equals(topicError.getKey())) {
+                            future.getValue().completeExceptionally(topicError.getValue().exception());
+                        }
+                    }
+                }
+
+                // grouping topic partitions per leader
+                Map<Node, Map<TopicPartition, Long>> leaders = new HashMap<>();
+                for (Map.Entry<TopicPartition, RecordsToDelete> entry: recordsToDelete.entrySet()) {
+
+                    // avoiding to send deletion request for topics with errors
+                    if (!errors.containsKey(entry.getKey().topic())) {
+
+                        Node node = cluster.leaderFor(entry.getKey());
+                        if (node != null) {
+                            if (!leaders.containsKey(node))
+                                leaders.put(node, new HashMap<TopicPartition, Long>());
+                            leaders.get(node).put(entry.getKey(), entry.getValue().beforeOffset());
+                        } else {
+                            KafkaFutureImpl<DeletedRecords> future = futures.get(entry.getKey());
+                            future.completeExceptionally(Errors.LEADER_NOT_AVAILABLE.exception());
+                        }
+                    }
+                }
+
+                for (final Map.Entry<Node, Map<TopicPartition, Long>> entry: leaders.entrySet()) {
+
+                    final long nowDelete = time.milliseconds();
+
+                    final int brokerId = entry.getKey().id();
+
+                    runnable.call(new Call("deleteRecords", deadline,
+                            new ConstantNodeIdProvider(brokerId)) {
+
+                        @Override
+                        AbstractRequest.Builder createRequest(int timeoutMs) {
+                            return new DeleteRecordsRequest.Builder(timeoutMs, entry.getValue());
+                        }
+
+                        @Override
+                        void handleResponse(AbstractResponse abstractResponse) {
+                            DeleteRecordsResponse response = (DeleteRecordsResponse) abstractResponse;
+                            for (Map.Entry<TopicPartition, DeleteRecordsResponse.PartitionResponse> result: response.responses().entrySet()) {
+
+                                KafkaFutureImpl<DeletedRecords> future = futures.get(result.getKey());
+                                if (result.getValue().error == Errors.NONE) {
+                                    future.complete(new DeletedRecords(result.getValue().lowWatermark));
+                                } else {
+                                    future.completeExceptionally(result.getValue().error.exception());
+                                }
+                            }
+                        }
+
+                        @Override
+                        void handleFailure(Throwable throwable) {
+                            completeAllExceptionally(futures.values(), throwable);
+                        }
+                    }, nowDelete);
+                }
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                completeAllExceptionally(futures.values(), throwable);
+            }
+        }, nowMetadata);
+
+        return new DeleteRecordsResult(new HashMap<TopicPartition, KafkaFuture<DeletedRecords>>(futures));
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/53c5ccb3/clients/src/main/java/org/apache/kafka/clients/admin/RecordsToDelete.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/RecordsToDelete.java b/clients/src/main/java/org/apache/kafka/clients/admin/RecordsToDelete.java
new file mode 100644
index 0000000..1981929
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/RecordsToDelete.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+
+/**
+ * Describe records to delete in a call to {@link AdminClient#deleteRecords(Map)}
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class RecordsToDelete {
+
+    private long offset;
+
+    private RecordsToDelete(long offset) {
+        this.offset = offset;
+    }
+
+    /**
+     * Delete all the records before the given {@code offset}
+     *
+     * @param offset    the offset before which all records will be deleted
+     */
+    public static RecordsToDelete beforeOffset(long offset) {
+        return new RecordsToDelete(offset);
+    }
+
+    /**
+     * The offset before which all records will be deleted
+     */
+    public long beforeOffset() {
+        return offset;
+    }
+
+    @Override
+    public String toString() {
+        return "(beforeOffset = " + offset + ")";
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/53c5ccb3/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 2412d03..c0fe73c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.acl.AccessControlEntry;
 import org.apache.kafka.common.acl.AccessControlEntryFilter;
 import org.apache.kafka.common.acl.AclBinding;
@@ -30,8 +31,12 @@ import org.apache.kafka.common.acl.AclOperation;
 import org.apache.kafka.common.acl.AclPermissionType;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.LeaderNotAvailableException;
+import org.apache.kafka.common.errors.NotLeaderForPartitionException;
+import org.apache.kafka.common.errors.OffsetOutOfRangeException;
 import org.apache.kafka.common.errors.SecurityDisabledException;
 import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.CreatePartitionsResponse;
 import org.apache.kafka.common.requests.ApiError;
@@ -41,8 +46,10 @@ import org.apache.kafka.common.requests.CreateTopicsResponse;
 import org.apache.kafka.common.requests.DeleteAclsResponse;
 import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult;
 import org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse;
+import org.apache.kafka.common.requests.DeleteRecordsResponse;
 import org.apache.kafka.common.requests.DescribeAclsResponse;
 import org.apache.kafka.common.requests.DescribeConfigsResponse;
+import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.resource.Resource;
 import org.apache.kafka.common.resource.ResourceFilter;
 import org.apache.kafka.common.resource.ResourceType;
@@ -56,6 +63,7 @@ import org.junit.rules.Timeout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -414,6 +422,113 @@ public class KafkaAdminClientTest {
         }
     }
 
+    @Test
+    public void testDeleteRecords() throws Exception {
+
+        HashMap<Integer, Node> nodes = new HashMap<>();
+        nodes.put(0, new Node(0, "localhost", 8121));
+        List<PartitionInfo> partitionInfos = new ArrayList<>();
+        partitionInfos.add(new PartitionInfo("my_topic", 0, nodes.get(0), new Node[] {nodes.get(0)}, new Node[] {nodes.get(0)}));
+        partitionInfos.add(new PartitionInfo("my_topic", 1, nodes.get(0), new Node[] {nodes.get(0)}, new Node[] {nodes.get(0)}));
+        partitionInfos.add(new PartitionInfo("my_topic", 2, null, new Node[] {nodes.get(0)}, new Node[] {nodes.get(0)}));
+        partitionInfos.add(new PartitionInfo("my_topic", 3, nodes.get(0), new Node[] {nodes.get(0)}, new Node[] {nodes.get(0)}));
+        partitionInfos.add(new PartitionInfo("my_topic", 4, nodes.get(0), new Node[] {nodes.get(0)}, new Node[] {nodes.get(0)}));
+        Cluster cluster = new Cluster("mockClusterId", nodes.values(),
+                partitionInfos, Collections.<String>emptySet(),
+                Collections.<String>emptySet(), nodes.get(0));
+
+        TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 0);
+        TopicPartition myTopicPartition1 = new TopicPartition("my_topic", 1);
+        TopicPartition myTopicPartition2 = new TopicPartition("my_topic", 2);
+        TopicPartition myTopicPartition3 = new TopicPartition("my_topic", 3);
+        TopicPartition myTopicPartition4 = new TopicPartition("my_topic", 4);
+
+        try (MockKafkaAdminClientEnv env = new MockKafkaAdminClientEnv(cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet());
+            env.kafkaClient().setNode(env.cluster().nodes().get(0));
+
+            Map<TopicPartition, DeleteRecordsResponse.PartitionResponse> m = new HashMap<>();
+            m.put(myTopicPartition0,
+                    new DeleteRecordsResponse.PartitionResponse(3, Errors.NONE));
+            m.put(myTopicPartition1,
+                    new DeleteRecordsResponse.PartitionResponse(DeleteRecordsResponse.INVALID_LOW_WATERMARK, Errors.OFFSET_OUT_OF_RANGE));
+            m.put(myTopicPartition3,
+                    new DeleteRecordsResponse.PartitionResponse(DeleteRecordsResponse.INVALID_LOW_WATERMARK, Errors.NOT_LEADER_FOR_PARTITION));
+            m.put(myTopicPartition4,
+                    new DeleteRecordsResponse.PartitionResponse(DeleteRecordsResponse.INVALID_LOW_WATERMARK, Errors.UNKNOWN_TOPIC_OR_PARTITION));
+
+            List<MetadataResponse.TopicMetadata> t = new ArrayList<>();
+            List<MetadataResponse.PartitionMetadata> p = new ArrayList<>();
+            p.add(new MetadataResponse.PartitionMetadata(Errors.NONE, 0, nodes.get(0),
+                    Collections.singletonList(nodes.get(0)), Collections.singletonList(nodes.get(0)), Collections.<Node>emptyList()));
+            p.add(new MetadataResponse.PartitionMetadata(Errors.NONE, 1, nodes.get(0),
+                    Collections.singletonList(nodes.get(0)), Collections.singletonList(nodes.get(0)), Collections.<Node>emptyList()));
+            p.add(new MetadataResponse.PartitionMetadata(Errors.LEADER_NOT_AVAILABLE, 2, null,
+                    Collections.singletonList(nodes.get(0)), Collections.singletonList(nodes.get(0)), Collections.<Node>emptyList()));
+            p.add(new MetadataResponse.PartitionMetadata(Errors.NONE, 3, nodes.get(0),
+                    Collections.singletonList(nodes.get(0)), Collections.singletonList(nodes.get(0)), Collections.<Node>emptyList()));
+            p.add(new MetadataResponse.PartitionMetadata(Errors.NONE, 4, nodes.get(0),
+                    Collections.singletonList(nodes.get(0)), Collections.singletonList(nodes.get(0)), Collections.<Node>emptyList()));
+
+            t.add(new MetadataResponse.TopicMetadata(Errors.NONE, "my_topic", false, p));
+
+            env.kafkaClient().prepareResponse(new MetadataResponse(cluster.nodes(), cluster.clusterResource().clusterId(), cluster.controller().id(), t));
+            env.kafkaClient().prepareResponse(new DeleteRecordsResponse(0, m));
+
+            Map<TopicPartition, RecordsToDelete> recordsToDelete = new HashMap<>();
+            recordsToDelete.put(myTopicPartition0, RecordsToDelete.beforeOffset(3L));
+            recordsToDelete.put(myTopicPartition1, RecordsToDelete.beforeOffset(10L));
+            recordsToDelete.put(myTopicPartition2, RecordsToDelete.beforeOffset(10L));
+            recordsToDelete.put(myTopicPartition3, RecordsToDelete.beforeOffset(10L));
+            recordsToDelete.put(myTopicPartition4, RecordsToDelete.beforeOffset(10L));
+
+            DeleteRecordsResult results = env.adminClient().deleteRecords(recordsToDelete);
+
+            // success on records deletion for partition 0
+            Map<TopicPartition, KafkaFuture<DeletedRecords>> values = results.lowWatermarks();
+            KafkaFuture<DeletedRecords> myTopicPartition0Result = values.get(myTopicPartition0);
+            long lowWatermark = myTopicPartition0Result.get().lowWatermark();
+            assertEquals(lowWatermark, 3);
+
+            // "offset out of range" failure on records deletion for partition 1
+            KafkaFuture<DeletedRecords> myTopicPartition1Result = values.get(myTopicPartition1);
+            try {
+                myTopicPartition1Result.get();
+                fail("get() should throw ExecutionException");
+            } catch (ExecutionException e0) {
+                assertTrue(e0.getCause() instanceof OffsetOutOfRangeException);
+            }
+
+            // "leader not available" failure on metadata request for partition 2
+            KafkaFuture<DeletedRecords> myTopicPartition2Result = values.get(myTopicPartition2);
+            try {
+                myTopicPartition2Result.get();
+                fail("get() should throw ExecutionException");
+            } catch (ExecutionException e1) {
+                assertTrue(e1.getCause() instanceof LeaderNotAvailableException);
+            }
+
+            // "not leader for partition" failure on records deletion for partition 3
+            KafkaFuture<DeletedRecords> myTopicPartition3Result = values.get(myTopicPartition3);
+            try {
+                myTopicPartition3Result.get();
+                fail("get() should throw ExecutionException");
+            } catch (ExecutionException e1) {
+                assertTrue(e1.getCause() instanceof NotLeaderForPartitionException);
+            }
+
+            // "unknown topic or partition" failure on records deletion for partition 4
+            KafkaFuture<DeletedRecords> myTopicPartition4Result = values.get(myTopicPartition4);
+            try {
+                myTopicPartition4Result.get();
+                fail("get() should throw ExecutionException");
+            } catch (ExecutionException e1) {
+                assertTrue(e1.getCause() instanceof UnknownTopicOrPartitionException);
+            }
+        }
+    }
+
     private static <T> void assertCollectionIs(Collection<T> collection, T... elements) {
         for (T element : elements) {
             assertTrue("Did not find " + element, collection.contains(element));

http://git-wip-us.apache.org/repos/asf/kafka/blob/53c5ccb3/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index d92ca30..1490a82 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -25,26 +25,28 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
 
 import org.apache.kafka.clients.admin.KafkaAdminClientTest
 import org.apache.kafka.common.utils.{Time, Utils}
-import kafka.integration.KafkaServerTestHarness
 import kafka.log.LogConfig
 import kafka.server.{Defaults, KafkaConfig, KafkaServer}
 import org.apache.kafka.clients.admin._
 import kafka.utils.{Logging, TestUtils, ZkUtils}
 import kafka.utils.Implicits._
 import org.apache.kafka.clients.admin.NewTopic
+import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.apache.kafka.clients.producer.KafkaProducer
 import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.kafka.common.{KafkaFuture, TopicPartition, TopicPartitionReplica}
 import org.apache.kafka.common.acl._
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.errors._
 import org.junit.{After, Before, Rule, Test}
-import org.apache.kafka.common.requests.MetadataResponse
+import org.apache.kafka.common.requests.{DeleteRecordsRequest, MetadataResponse}
 import org.apache.kafka.common.resource.{Resource, ResourceType}
 import org.junit.rules.Timeout
 import org.junit.Assert._
 
 import scala.util.Random
 import scala.collection.JavaConverters._
+import java.lang.{Long => JLong}
 import scala.concurrent.duration.Duration
 import scala.concurrent.{Await, Future}
 
@@ -53,7 +55,7 @@ import scala.concurrent.{Await, Future}
  *
  * Also see {@link org.apache.kafka.clients.admin.KafkaAdminClientTest} for a unit test of the admin client.
  */
-class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging {
+class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
 
   import AdminClientIntegrationTest._
 
@@ -62,6 +64,10 @@ class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging {
 
   var client: AdminClient = null
 
+  val topic = "topic"
+  val partition = 0
+  val topicPartition = new TopicPartition(topic, partition)
+
   @Before
   override def setUp(): Unit = {
     super.setUp
@@ -75,11 +81,12 @@ class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging {
     super.tearDown()
   }
 
-  val brokerCount = 3
-  lazy val serverConfig = new Properties
+  val serverCount = 3
+  val consumerCount = 1
+  val producerCount = 1
 
   override def generateConfigs = {
-    val cfgs = TestUtils.createBrokerConfigs(brokerCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol),
+    val cfgs = TestUtils.createBrokerConfigs(serverCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol),
       trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, logDirCount = 2)
     cfgs.foreach { config =>
       config.setProperty(KafkaConfig.ListenersProp, s"${listenerName.value}://localhost:${TestUtils.RandomPort}")
@@ -193,7 +200,7 @@ class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging {
       assertEquals(3, partition.replicas.size)
       partition.replicas.asScala.foreach { replica =>
         assertTrue(replica.id >= 0)
-        assertTrue(replica.id < brokerCount)
+        assertTrue(replica.id < serverCount)
       }
       assertEquals("No duplicate replica ids", partition.replicas.size, partition.replicas.asScala.map(_.id).distinct.size)
 
@@ -247,10 +254,10 @@ class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging {
     val topic = "topic"
     val leaderByPartition = TestUtils.createTopic(zkUtils, topic, 10, 1, servers, new Properties())
     val partitionsByBroker = leaderByPartition.groupBy { case (partitionId, leaderId) => leaderId }.mapValues(_.keys.toSeq)
-    val brokers = (0 until brokerCount).map(Integer.valueOf)
+    val brokers = (0 until serverCount).map(Integer.valueOf)
     val logDirInfosByBroker = client.describeLogDirs(brokers.asJava).all.get
 
-    (0 until brokerCount).foreach { brokerId =>
+    (0 until serverCount).foreach { brokerId =>
       val server = servers.find(_.config.brokerId == brokerId).get
       val expectedPartitions = partitionsByBroker(brokerId)
       val logDirInfos = logDirInfosByBroker.get(brokerId)
@@ -307,7 +314,7 @@ class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging {
       assertTrue(exception.getCause.isInstanceOf[ReplicaNotAvailableException])
     }
 
-    TestUtils.createTopic(zkUtils, topic, 1, brokerCount, servers, new Properties)
+    TestUtils.createTopic(zkUtils, topic, 1, serverCount, servers, new Properties)
     servers.foreach { server =>
       val logDir = server.logManager.getLog(tp).get.dir.getParent
       assertEquals(firstReplicaAssignment(new TopicPartitionReplica(topic, 0, server.config.brokerId)), logDir)
@@ -699,6 +706,133 @@ class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging {
   }
 
   @Test
+  def testSeekAfterDeleteRecords(): Unit = {
+    TestUtils.createTopic(zkUtils, topic, 2, serverCount, servers)
+
+    client = AdminClient.create(createConfig)
+
+    val consumer = consumers.head
+    subscribeAndWaitForAssignment(topic, consumer)
+
+    sendRecords(producers.head, 10, topicPartition)
+    consumer.seekToBeginning(Collections.singleton(topicPartition))
+    assertEquals(0L, consumer.position(topicPartition))
+
+    val result = client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(5L)).asJava)
+    val lowWatermark = result.lowWatermarks().get(topicPartition).get().lowWatermark()
+    assertEquals(5L, lowWatermark)
+
+    consumer.seekToBeginning(Collections.singletonList(topicPartition))
+    assertEquals(5L, consumer.position(topicPartition))
+
+    consumer.seek(topicPartition, 7L)
+    assertEquals(7L, consumer.position(topicPartition))
+
+    client.close()
+  }
+
+  @Test
+  def testLogStartOffsetCheckpoint(): Unit = {
+    TestUtils.createTopic(zkUtils, topic, 2, serverCount, servers)
+
+    client = AdminClient.create(createConfig)
+
+    subscribeAndWaitForAssignment(topic, consumers.head)
+
+    sendRecords(producers.head, 10, topicPartition)
+    var result = client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(5L)).asJava)
+    var lowWatermark = result.lowWatermarks().get(topicPartition).get().lowWatermark()
+    assertEquals(5L, lowWatermark)
+
+    for (i <- 0 until serverCount) {
+      killBroker(i)
+    }
+    restartDeadBrokers()
+
+    client.close()
+    brokerList = TestUtils.bootstrapServers(servers, listenerName)
+    client = AdminClient.create(createConfig)
+
+    TestUtils.waitUntilTrue(() => {
+      // Need to retry if leader is not available for the partition
+      result = client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(0L)).asJava)
+
+      val future = result.lowWatermarks().get(topicPartition)
+      try {
+        lowWatermark = future.get(1000L, TimeUnit.MILLISECONDS).lowWatermark()
+        lowWatermark == 5L
+      } catch {
+        case e: LeaderNotAvailableException => false
+      }
+
+    }, "Expected low watermark of the partition to be 5L")
+
+    client.close()
+  }
+
+  @Test
+  def testLogStartOffsetAfterDeleteRecords(): Unit = {
+    TestUtils.createTopic(zkUtils, topic, 2, serverCount, servers)
+
+    client = AdminClient.create(createConfig)
+
+    subscribeAndWaitForAssignment(topic, consumers.head)
+
+    sendRecords(producers.head, 10, topicPartition)
+    val result = client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(3L)).asJava)
+    val lowWatermark = result.lowWatermarks().get(topicPartition).get().lowWatermark()
+    assertEquals(3L, lowWatermark)
+
+    for (i <- 0 until serverCount)
+      assertEquals(3, servers(i).replicaManager.getReplica(topicPartition).get.logStartOffset)
+
+    client.close()
+  }
+
+  @Test
+  def testOffsetsForTimesAfterDeleteRecords(): Unit = {
+    TestUtils.createTopic(zkUtils, topic, 2, serverCount, servers)
+
+    client = AdminClient.create(createConfig)
+
+    val consumer = consumers.head
+    subscribeAndWaitForAssignment(topic, consumer)
+
+    sendRecords(producers.head, 10, topicPartition)
+    assertEquals(0L, consumer.offsetsForTimes(Map(topicPartition -> new JLong(0L)).asJava).get(topicPartition).offset())
+
+    var result = client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(5L)).asJava)
+    result.all().get()
+    assertEquals(5L, consumer.offsetsForTimes(Map(topicPartition -> new JLong(0L)).asJava).get(topicPartition).offset())
+
+    result = client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(DeleteRecordsRequest.HIGH_WATERMARK)).asJava)
+    result.all().get()
+    assertNull(consumer.offsetsForTimes(Map(topicPartition -> new JLong(0L)).asJava).get(topicPartition))
+
+    client.close()
+  }
+
+  private def subscribeAndWaitForAssignment(topic: String, consumer: KafkaConsumer[Array[Byte], Array[Byte]]): Unit = {
+    consumer.subscribe(Collections.singletonList(topic))
+    TestUtils.waitUntilTrue(() => {
+      consumer.poll(0)
+      !consumer.assignment.isEmpty
+    }, "Expected non-empty assignment")
+  }
+
+  private def sendRecords(producer: KafkaProducer[Array[Byte], Array[Byte]],
+                          numRecords: Int,
+                          topicPartition: TopicPartition): Unit = {
+    val futures = (0 until numRecords).map( i => {
+      val record = new ProducerRecord(topicPartition.topic, topicPartition.partition, s"$i".getBytes, s"$i".getBytes)
+      debug(s"Sending this record: $record")
+      producer.send(record)
+    })
+
+    futures.foreach(_.get)
+  }
+
+  @Test
   def testInvalidAlterConfigs(): Unit = {
     client = AdminClient.create(createConfig)
     checkInvalidAlterConfigs(zkUtils, servers, client)