You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2021/02/12 20:27:42 UTC

[kafka] branch 2.8 updated: MINOR: Revert AdminClient changes for DeleteTopics (#10121)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
     new f4e475c  MINOR: Revert AdminClient changes for DeleteTopics (#10121)
f4e475c is described below

commit f4e475c2cdacc727567faa89d7ff11b932c6627b
Author: Justine Olshan <jo...@confluent.io>
AuthorDate: Fri Feb 12 15:26:02 2021 -0500

    MINOR: Revert AdminClient changes for DeleteTopics (#10121)
    
    This PR removes the AdminClient changes pertaining to deleteTopicsWithIds and DeleteTopicsWithIdsResult in 2.8 branch.
    
    Reviewers: Chia-Ping Tsai <ch...@gmail.com>, Rajini Sivaram <ra...@googlemail.com>
---
 .../java/org/apache/kafka/clients/admin/Admin.java |  36 -----
 .../clients/admin/DeleteTopicsWithIdsResult.java   |  54 --------
 .../kafka/clients/admin/KafkaAdminClient.java      | 102 --------------
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 152 ---------------------
 .../kafka/clients/admin/MockAdminClient.java       |  31 -----
 .../kafka/api/PlaintextAdminIntegrationTest.scala  |  19 ---
 6 files changed, 394 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
index d732cd2..e3f3e48 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
@@ -33,7 +33,6 @@ import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.TopicPartitionReplica;
 import org.apache.kafka.common.acl.AclBinding;
 import org.apache.kafka.common.acl.AclBindingFilter;
@@ -252,41 +251,6 @@ public interface Admin extends AutoCloseable {
      * @return The DeleteTopicsResult.
      */
     DeleteTopicsResult deleteTopics(Collection<String> topics, DeleteTopicsOptions options);
-    
-    /**
-     * This is a convenience method for {@link #deleteTopicsWithIds(Collection, DeleteTopicsOptions)}
-     * with default options. See the overload for more details.
-     * <p>
-     * This operation is supported by brokers with version 2.8.0 or higher.
-     *
-     * @param topics The topic IDs for the topics to delete.
-     * @return The DeleteTopicsWithIdsResult.
-     */
-    default DeleteTopicsWithIdsResult deleteTopicsWithIds(Collection<Uuid> topics) {
-        return deleteTopicsWithIds(topics, new DeleteTopicsOptions());
-    }
-
-    /**
-     * Delete a batch of topics.
-     * <p>
-     * This operation is not transactional so it may succeed for some topics while fail for others.
-     * <p>
-     * It may take several seconds after the {@link DeleteTopicsWithIdsResult} returns
-     * success for all the brokers to become aware that the topics are gone.
-     * During this time, {@link #listTopics()} and {@link #describeTopics(Collection)}
-     * may continue to return information about the deleted topics.
-     * <p>
-     * If delete.topic.enable is false on the brokers, deleteTopicsWithIds will mark
-     * the topics for deletion, but not actually delete them. The futures will
-     * return successfully in this case.
-     * <p>
-     * This operation is supported by brokers with version 2.8.0 or higher.
-     *
-     * @param topics  The topic IDs for the topics to delete.
-     * @param options The options to use when deleting the topics.
-     * @return The DeleteTopicsWithIdsResult.
-     */
-    DeleteTopicsWithIdsResult deleteTopicsWithIds(Collection<Uuid> topics, DeleteTopicsOptions options);
 
     /**
      * List the topics available in the cluster with the default options.
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsWithIdsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsWithIdsResult.java
deleted file mode 100644
index eeb9119..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsWithIdsResult.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.clients.admin;
-
-import org.apache.kafka.common.KafkaFuture;
-import org.apache.kafka.common.Uuid;
-import org.apache.kafka.common.annotation.InterfaceStability;
-
-import java.util.Collection;
-import java.util.Map;
-
-/**
- * The result of the {@link Admin#deleteTopicsWithIds(Collection)} call.
- *
- * The API of this class is evolving, see {@link Admin} for details.
- */
-@InterfaceStability.Evolving
-public class DeleteTopicsWithIdsResult {
-    final Map<Uuid, KafkaFuture<Void>> futures;
-
-    DeleteTopicsWithIdsResult(Map<Uuid, KafkaFuture<Void>> futures) {
-        this.futures = futures;
-    }
-
-    /**
-     * Return a map from topic IDs to futures which can be used to check the status of
-     * individual deletions.
-     */
-    public Map<Uuid, KafkaFuture<Void>> values() {
-        return futures;
-    }
-
-    /**
-     * Return a future which succeeds only if all the topic deletions succeed.
-     */
-    public KafkaFuture<Void> all() {
-        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]));
-    }
-}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index ab40470..b2bfac3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -49,7 +49,6 @@ import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.TopicPartitionInfo;
 import org.apache.kafka.common.TopicPartitionReplica;
-import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.acl.AclBinding;
 import org.apache.kafka.common.acl.AclBindingFilter;
 import org.apache.kafka.common.acl.AclOperation;
@@ -68,7 +67,6 @@ import org.apache.kafka.common.errors.ThrottlingQuotaExceededException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.UnacceptableCredentialException;
 import org.apache.kafka.common.errors.UnknownServerException;
-import org.apache.kafka.common.errors.UnknownTopicIdException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
@@ -110,7 +108,6 @@ import org.apache.kafka.common.message.DeleteRecordsRequestData.DeleteRecordsTop
 import org.apache.kafka.common.message.DeleteRecordsResponseData;
 import org.apache.kafka.common.message.DeleteRecordsResponseData.DeleteRecordsTopicResult;
 import org.apache.kafka.common.message.DeleteTopicsRequestData;
-import org.apache.kafka.common.message.DeleteTopicsRequestData.DeleteTopicState;
 import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult;
 import org.apache.kafka.common.message.DescribeClusterRequestData;
 import org.apache.kafka.common.message.DescribeConfigsRequestData;
@@ -1631,32 +1628,6 @@ public class KafkaAdminClient extends AdminClient {
         return new DeleteTopicsResult(new HashMap<>(topicFutures));
     }
 
-    @Override
-    public DeleteTopicsWithIdsResult deleteTopicsWithIds(final Collection<Uuid> topicIds,
-                                                         final DeleteTopicsOptions options) {
-        final Map<Uuid, KafkaFutureImpl<Void>> topicFutures = new HashMap<>(topicIds.size());
-        final List<Uuid> validTopicIds = new ArrayList<>(topicIds.size());
-        for (Uuid topicId : topicIds) {
-            if (topicId.equals(Uuid.ZERO_UUID)) {
-                KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
-                future.completeExceptionally(new UnknownTopicIdException("The given topic ID '" +
-                        topicId + "' cannot be represented in a request."));
-                topicFutures.put(topicId, future);
-            } else if (!topicFutures.containsKey(topicId)) {
-                topicFutures.put(topicId, new KafkaFutureImpl<>());
-                validTopicIds.add(topicId);
-            }
-        }
-        if (!validTopicIds.isEmpty()) {
-            final long now = time.milliseconds();
-            final long deadline = calcDeadlineMs(now, options.timeoutMs());
-            final Call call = getDeleteTopicsWithIdsCall(options, topicFutures, validTopicIds,
-                    Collections.emptyMap(), now, deadline);
-            runnable.call(call, now);
-        }
-        return new DeleteTopicsWithIdsResult(new HashMap<>(topicFutures));
-    }
-
     private Call getDeleteTopicsCall(final DeleteTopicsOptions options,
                                      final Map<String, KafkaFutureImpl<Void>> futures,
                                      final List<String> topics,
@@ -1728,79 +1699,6 @@ public class KafkaAdminClient extends AdminClient {
             }
         };
     }
-   
-    private Call getDeleteTopicsWithIdsCall(final DeleteTopicsOptions options,
-                                            final Map<Uuid, KafkaFutureImpl<Void>> futures,
-                                            final List<Uuid> topicIds,
-                                            final Map<Uuid, ThrottlingQuotaExceededException> quotaExceededExceptions,
-                                            final long now,
-                                            final long deadline) {
-        return new Call("deleteTopics", deadline, new ControllerNodeProvider()) {
-            @Override
-            DeleteTopicsRequest.Builder createRequest(int timeoutMs) {
-                return new DeleteTopicsRequest.Builder(
-                        new DeleteTopicsRequestData()
-                                .setTopics(topicIds.stream().map(
-                                    topic -> new DeleteTopicState().setTopicId(topic)).collect(Collectors.toList()))
-                                .setTimeoutMs(timeoutMs));
-            }
-
-            @Override
-            void handleResponse(AbstractResponse abstractResponse) {
-                // Check for controller change
-                handleNotControllerError(abstractResponse);
-                // Handle server responses for particular topics.
-                final DeleteTopicsResponse response = (DeleteTopicsResponse) abstractResponse;
-                final List<Uuid> retryTopics = new ArrayList<>();
-                final Map<Uuid, ThrottlingQuotaExceededException> retryTopicQuotaExceededExceptions = new HashMap<>();
-                for (DeletableTopicResult result : response.data().responses()) {
-                    KafkaFutureImpl<Void> future = futures.get(result.topicId());
-                    if (future == null) {
-                        log.warn("Server response mentioned unknown topic ID {}", result.topicId());
-                    } else {
-                        ApiError error = new ApiError(result.errorCode(), result.errorMessage());
-                        if (error.isFailure()) {
-                            if (error.is(Errors.THROTTLING_QUOTA_EXCEEDED)) {
-                                ThrottlingQuotaExceededException quotaExceededException = new ThrottlingQuotaExceededException(
-                                        response.throttleTimeMs(), error.messageWithFallback());
-                                if (options.shouldRetryOnQuotaViolation()) {
-                                    retryTopics.add(result.topicId());
-                                    retryTopicQuotaExceededExceptions.put(result.topicId(), quotaExceededException);
-                                } else {
-                                    future.completeExceptionally(quotaExceededException);
-                                }
-                            } else {
-                                future.completeExceptionally(error.exception());
-                            }
-                        } else {
-                            future.complete(null);
-                        }
-                    }
-                }
-                // If there are topics to retry, retry them; complete unrealized futures otherwise.
-                if (retryTopics.isEmpty()) {
-                    // The server should send back a response for every topic. But do a sanity check anyway.
-                    completeUnrealizedFutures(futures.entrySet().stream(),
-                        topic -> "The controller response did not contain a result for topic " + topic);
-                } else {
-                    final long now = time.milliseconds();
-                    final Call call = getDeleteTopicsWithIdsCall(options, futures, retryTopics,
-                            retryTopicQuotaExceededExceptions, now, deadline);
-                    runnable.call(call, now);
-                }
-            }
-
-            @Override
-            void handleFailure(Throwable throwable) {
-                // If there were any topics retries due to a quota exceeded exception, we propagate
-                // the initial error back to the caller if the request timed out.
-                maybeCompleteQuotaExceededException(options.shouldRetryOnQuotaViolation(),
-                        throwable, futures, quotaExceededExceptions, (int) (time.milliseconds() - now));
-                // Fail all the other remaining futures
-                completeAllExceptionally(futures.values(), throwable);
-            }
-        };
-    }
 
     @Override
     public ListTopicsResult listTopics(final ListTopicsOptions options) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index a5296da..ddd01be 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -65,7 +65,6 @@ import org.apache.kafka.common.errors.TopicDeletionDisabledException;
 import org.apache.kafka.common.errors.TopicExistsException;
 import org.apache.kafka.common.errors.UnknownMemberIdException;
 import org.apache.kafka.common.errors.UnknownServerException;
-import org.apache.kafka.common.errors.UnknownTopicIdException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.feature.Features;
@@ -413,12 +412,6 @@ public class KafkaAdminClientTest {
             .setErrorCode(error.code());
     }
 
-    public static DeletableTopicResult deletableTopicResultWithId(Uuid topicId, Errors error) {
-        return new DeletableTopicResult()
-                .setTopicId(topicId)
-                .setErrorCode(error.code());
-    }
-
     public static CreatePartitionsResponse prepareCreatePartitionsResponse(int throttleTimeMs, CreatePartitionsTopicResult... topics) {
         CreatePartitionsResponseData data = new CreatePartitionsResponseData()
             .setThrottleTimeMs(throttleTimeMs)
@@ -445,14 +438,6 @@ public class KafkaAdminClientTest {
         return new DeleteTopicsResponse(data);
     }
 
-    private static DeleteTopicsResponse prepareDeleteTopicsResponseWithTopicId(Uuid id, Errors error) {
-        DeleteTopicsResponseData data = new DeleteTopicsResponseData();
-        data.responses().add(new DeletableTopicResult()
-                .setTopicId(id)
-                .setErrorCode(error.code()));
-        return new DeleteTopicsResponse(data);
-    }
-
     private static FindCoordinatorResponse prepareFindCoordinatorResponse(Errors error, Node node) {
         return FindCoordinatorResponse.prepareResponse(error, node);
     }
@@ -886,33 +871,8 @@ public class KafkaAdminClientTest {
             future = env.adminClient().deleteTopics(singletonList("myTopic"),
                 new DeleteTopicsOptions()).all();
             TestUtils.assertFutureError(future, UnknownTopicOrPartitionException.class);
-            
-            // With topic IDs
-            Uuid topicId = Uuid.randomUuid();
-
-            env.kafkaClient().prepareResponse(
-                    expectDeleteTopicsRequestWithTopicIds(topicId),
-                    prepareDeleteTopicsResponseWithTopicId(topicId, Errors.NONE));
-            future = env.adminClient().deleteTopicsWithIds(singletonList(topicId),
-                    new DeleteTopicsOptions()).all();
-            assertNull(future.get());
-
-            env.kafkaClient().prepareResponse(
-                    expectDeleteTopicsRequestWithTopicIds(topicId),
-                    prepareDeleteTopicsResponseWithTopicId(topicId, Errors.TOPIC_DELETION_DISABLED));
-            future = env.adminClient().deleteTopicsWithIds(singletonList(topicId),
-                    new DeleteTopicsOptions()).all();
-            TestUtils.assertFutureError(future, TopicDeletionDisabledException.class);
-
-            env.kafkaClient().prepareResponse(
-                    expectDeleteTopicsRequestWithTopicIds(topicId),
-                    prepareDeleteTopicsResponseWithTopicId(topicId, Errors.UNKNOWN_TOPIC_ID));
-            future = env.adminClient().deleteTopicsWithIds(singletonList(topicId),
-                    new DeleteTopicsOptions()).all();
-            TestUtils.assertFutureError(future, UnknownTopicIdException.class);
         }
     }
-    
 
     @Test
     public void testDeleteTopicsPartialResponse() throws Exception {
@@ -929,20 +889,6 @@ public class KafkaAdminClientTest {
 
             result.values().get("myTopic").get();
             TestUtils.assertFutureThrows(result.values().get("myOtherTopic"), ApiException.class);
-            
-            // With topic IDs
-            Uuid topicId1 = Uuid.randomUuid();
-            Uuid topicId2 = Uuid.randomUuid();
-            env.kafkaClient().prepareResponse(
-                    expectDeleteTopicsRequestWithTopicIds(topicId1, topicId2),
-                    prepareDeleteTopicsResponse(1000,
-                            deletableTopicResultWithId(topicId1, Errors.NONE)));
-
-            DeleteTopicsWithIdsResult resultIds = env.adminClient().deleteTopicsWithIds(
-                    asList(topicId1, topicId2), new DeleteTopicsOptions());
-
-            resultIds.values().get(topicId1).get();
-            TestUtils.assertFutureThrows(resultIds.values().get(topicId2), ApiException.class);
         }
     }
 
@@ -975,36 +921,6 @@ public class KafkaAdminClientTest {
             assertNull(result.values().get("topic1").get());
             assertNull(result.values().get("topic2").get());
             TestUtils.assertFutureThrows(result.values().get("topic3"), TopicExistsException.class);
-            
-            // With topic IDs
-            Uuid topicId1 = Uuid.randomUuid();
-            Uuid topicId2 = Uuid.randomUuid();
-            Uuid topicId3 = Uuid.randomUuid();
-            
-            env.kafkaClient().prepareResponse(
-                    expectDeleteTopicsRequestWithTopicIds(topicId1, topicId2, topicId3),
-                    prepareDeleteTopicsResponse(1000,
-                            deletableTopicResultWithId(topicId1, Errors.NONE),
-                            deletableTopicResultWithId(topicId2, Errors.THROTTLING_QUOTA_EXCEEDED),
-                            deletableTopicResultWithId(topicId3, Errors.UNKNOWN_TOPIC_ID)));
-
-            env.kafkaClient().prepareResponse(
-                    expectDeleteTopicsRequestWithTopicIds(topicId2),
-                    prepareDeleteTopicsResponse(1000,
-                            deletableTopicResultWithId(topicId2, Errors.THROTTLING_QUOTA_EXCEEDED)));
-
-            env.kafkaClient().prepareResponse(
-                    expectDeleteTopicsRequestWithTopicIds(topicId2),
-                    prepareDeleteTopicsResponse(0,
-                            deletableTopicResultWithId(topicId2, Errors.NONE)));
-
-            DeleteTopicsWithIdsResult resultIds = env.adminClient().deleteTopicsWithIds(
-                    asList(topicId1, topicId2, topicId3),
-                    new DeleteTopicsOptions().retryOnQuotaViolation(true));
-
-            assertNull(resultIds.values().get(topicId1).get());
-            assertNull(resultIds.values().get(topicId2).get());
-            TestUtils.assertFutureThrows(resultIds.values().get(topicId3), UnknownTopicIdException.class);
         }
     }
 
@@ -1050,43 +966,6 @@ public class KafkaAdminClientTest {
                 ThrottlingQuotaExceededException.class);
             assertEquals(0, e.throttleTimeMs());
             TestUtils.assertFutureThrows(result.values().get("topic3"), TopicExistsException.class);
-            
-            // With topic IDs
-            Uuid topicId1 = Uuid.randomUuid();
-            Uuid topicId2 = Uuid.randomUuid();
-            Uuid topicId3 = Uuid.randomUuid();
-            env.kafkaClient().prepareResponse(
-                    expectDeleteTopicsRequestWithTopicIds(topicId1, topicId2, topicId3),
-                    prepareDeleteTopicsResponse(1000,
-                            deletableTopicResultWithId(topicId1, Errors.NONE),
-                            deletableTopicResultWithId(topicId2, Errors.THROTTLING_QUOTA_EXCEEDED),
-                            deletableTopicResultWithId(topicId3, Errors.UNKNOWN_TOPIC_ID)));
-
-            env.kafkaClient().prepareResponse(
-                    expectDeleteTopicsRequestWithTopicIds(topicId2),
-                    prepareDeleteTopicsResponse(1000,
-                            deletableTopicResultWithId(topicId2, Errors.THROTTLING_QUOTA_EXCEEDED)));
-
-            DeleteTopicsWithIdsResult resultIds = env.adminClient().deleteTopicsWithIds(
-                    asList(topicId1, topicId2, topicId3),
-                    new DeleteTopicsOptions().retryOnQuotaViolation(true));
-
-            // Wait until the prepared attempts have consumed
-            TestUtils.waitForCondition(() -> env.kafkaClient().numAwaitingResponses() == 0,
-                    "Failed awaiting DeleteTopics requests");
-
-            // Wait until the next request is sent out
-            TestUtils.waitForCondition(() -> env.kafkaClient().inFlightRequestCount() == 1,
-                    "Failed awaiting next DeleteTopics request");
-
-            // Advance time past the default api timeout to time out the inflight request
-            time.sleep(defaultApiTimeout + 1);
-
-            assertNull(resultIds.values().get(topicId1).get());
-            e = TestUtils.assertFutureThrows(resultIds.values().get(topicId2),
-                    ThrottlingQuotaExceededException.class);
-            assertEquals(0, e.throttleTimeMs());
-            TestUtils.assertFutureThrows(resultIds.values().get(topicId3), UnknownTopicIdException.class);
         }
     }
 
@@ -1111,27 +990,6 @@ public class KafkaAdminClientTest {
                 ThrottlingQuotaExceededException.class);
             assertEquals(1000, e.throttleTimeMs());
             TestUtils.assertFutureError(result.values().get("topic3"), TopicExistsException.class);
-
-            // With topic IDs
-            Uuid topicId1 = Uuid.randomUuid();
-            Uuid topicId2 = Uuid.randomUuid();
-            Uuid topicId3 = Uuid.randomUuid();
-            env.kafkaClient().prepareResponse(
-                    expectDeleteTopicsRequestWithTopicIds(topicId1, topicId2, topicId3),
-                    prepareDeleteTopicsResponse(1000,
-                            deletableTopicResultWithId(topicId1, Errors.NONE),
-                            deletableTopicResultWithId(topicId2, Errors.THROTTLING_QUOTA_EXCEEDED),
-                            deletableTopicResultWithId(topicId3, Errors.UNKNOWN_TOPIC_ID)));
-
-            DeleteTopicsWithIdsResult resultIds = env.adminClient().deleteTopicsWithIds(
-                    asList(topicId1, topicId2, topicId3),
-                    new DeleteTopicsOptions().retryOnQuotaViolation(false));
-
-            assertNull(resultIds.values().get(topicId1).get());
-            e = TestUtils.assertFutureThrows(resultIds.values().get(topicId2),
-                    ThrottlingQuotaExceededException.class);
-            assertEquals(1000, e.throttleTimeMs());
-            TestUtils.assertFutureError(resultIds.values().get(topicId3), UnknownTopicIdException.class);
         }
     }
 
@@ -1145,16 +1003,6 @@ public class KafkaAdminClientTest {
         };
     }
 
-    private MockClient.RequestMatcher expectDeleteTopicsRequestWithTopicIds(final Uuid... topicIds) {
-        return body -> {
-            if (body instanceof DeleteTopicsRequest) {
-                DeleteTopicsRequest request = (DeleteTopicsRequest) body;
-                return request.topicIds().equals(Arrays.asList(topicIds));
-            }
-            return false;
-        };
-    }
-
     @Test
     public void testInvalidTopicNames() throws Exception {
         try (AdminClientUnitTestEnv env = mockClientEnv()) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index c2b9cff..f4ccc98 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -423,37 +423,6 @@ public class MockAdminClient extends AdminClient {
     }
 
     @Override
-    synchronized public DeleteTopicsWithIdsResult deleteTopicsWithIds(Collection<Uuid> topicsToDelete, DeleteTopicsOptions options) {
-        Map<Uuid, KafkaFuture<Void>> deleteTopicsWithIdsResult = new HashMap<>();
-
-        if (timeoutNextRequests > 0) {
-            for (final Uuid topicId : topicsToDelete) {
-                KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
-                future.completeExceptionally(new TimeoutException());
-                deleteTopicsWithIdsResult.put(topicId, future);
-            }
-
-            --timeoutNextRequests;
-            return new DeleteTopicsWithIdsResult(deleteTopicsWithIdsResult);
-        }
-
-        for (final Uuid topicId : topicsToDelete) {
-            KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
-
-            String name = topicNames.remove(topicId);
-            if (name == null || allTopics.remove(name) == null) {
-                future.completeExceptionally(new UnknownTopicOrPartitionException(String.format("Topic %s does not exist.", topicId)));
-            } else {
-                topicIds.remove(name);
-                future.complete(null);
-            }
-            deleteTopicsWithIdsResult.put(topicId, future);
-        }
-
-        return new DeleteTopicsWithIdsResult(deleteTopicsWithIdsResult);
-    }
-
-    @Override
     synchronized public CreatePartitionsResult createPartitions(Map<String, NewPartitions> newPartitions, CreatePartitionsOptions options) {
         throw new UnsupportedOperationException("Not implemented yet");
     }
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 87cb257..049ef67 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -48,7 +48,6 @@ import org.slf4j.LoggerFactory
 import scala.annotation.nowarn
 import scala.jdk.CollectionConverters._
 import scala.collection.Seq
-import scala.compat.java8.OptionConverters._
 import scala.concurrent.duration.Duration
 import scala.concurrent.{Await, Future}
 import scala.util.Random
@@ -117,24 +116,6 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
   }
 
   @Test
-  def testDeleteTopicsWithIds(): Unit = {
-    client = Admin.create(createConfig)
-    val topics = Seq("mytopic", "mytopic2", "mytopic3")
-    val newTopics = Seq(
-      new NewTopic("mytopic", Map((0: Integer) -> Seq[Integer](1, 2).asJava, (1: Integer) -> Seq[Integer](2, 0).asJava).asJava),
-      new NewTopic("mytopic2", 3, 3.toShort),
-      new NewTopic("mytopic3", Option.empty[Integer].asJava, Option.empty[java.lang.Short].asJava)
-    )
-    val createResult = client.createTopics(newTopics.asJava)
-    createResult.all.get()
-    waitForTopics(client, topics, List())
-    val topicIds = getTopicIds().values.toSet
-
-    client.deleteTopicsWithIds(topicIds.asJava).all.get()
-    waitForTopics(client, List(), topics)
-  }
-
-  @Test
   def testMetadataRefresh(): Unit = {
     client = Admin.create(createConfig)
     val topics = Seq("mytopic")