You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2021/02/12 20:27:42 UTC
[kafka] branch 2.8 updated: MINOR: Revert AdminClient changes for
DeleteTopics (#10121)
This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.8 by this push:
new f4e475c MINOR: Revert AdminClient changes for DeleteTopics (#10121)
f4e475c is described below
commit f4e475c2cdacc727567faa89d7ff11b932c6627b
Author: Justine Olshan <jo...@confluent.io>
AuthorDate: Fri Feb 12 15:26:02 2021 -0500
MINOR: Revert AdminClient changes for DeleteTopics (#10121)
This PR removes the AdminClient changes pertaining to deleteTopicsWithIds and DeleteTopicsWithIdsResult in 2.8 branch.
Reviewers: Chia-Ping Tsai <ch...@gmail.com>, Rajini Sivaram <ra...@googlemail.com>
---
.../java/org/apache/kafka/clients/admin/Admin.java | 36 -----
.../clients/admin/DeleteTopicsWithIdsResult.java | 54 --------
.../kafka/clients/admin/KafkaAdminClient.java | 102 --------------
.../kafka/clients/admin/KafkaAdminClientTest.java | 152 ---------------------
.../kafka/clients/admin/MockAdminClient.java | 31 -----
.../kafka/api/PlaintextAdminIntegrationTest.scala | 19 ---
6 files changed, 394 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
index d732cd2..e3f3e48 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
@@ -33,7 +33,6 @@ import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.TopicPartitionReplica;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
@@ -252,41 +251,6 @@ public interface Admin extends AutoCloseable {
* @return The DeleteTopicsResult.
*/
DeleteTopicsResult deleteTopics(Collection<String> topics, DeleteTopicsOptions options);
-
- /**
- * This is a convenience method for {@link #deleteTopicsWithIds(Collection, DeleteTopicsOptions)}
- * with default options. See the overload for more details.
- * <p>
- * This operation is supported by brokers with version 2.8.0 or higher.
- *
- * @param topics The topic IDs for the topics to delete.
- * @return The DeleteTopicsWithIdsResult.
- */
- default DeleteTopicsWithIdsResult deleteTopicsWithIds(Collection<Uuid> topics) {
- return deleteTopicsWithIds(topics, new DeleteTopicsOptions());
- }
-
- /**
- * Delete a batch of topics.
- * <p>
- * This operation is not transactional so it may succeed for some topics while fail for others.
- * <p>
- * It may take several seconds after the {@link DeleteTopicsWithIdsResult} returns
- * success for all the brokers to become aware that the topics are gone.
- * During this time, {@link #listTopics()} and {@link #describeTopics(Collection)}
- * may continue to return information about the deleted topics.
- * <p>
- * If delete.topic.enable is false on the brokers, deleteTopicsWithIds will mark
- * the topics for deletion, but not actually delete them. The futures will
- * return successfully in this case.
- * <p>
- * This operation is supported by brokers with version 2.8.0 or higher.
- *
- * @param topics The topic IDs for the topics to delete.
- * @param options The options to use when deleting the topics.
- * @return The DeleteTopicsWithIdsResult.
- */
- DeleteTopicsWithIdsResult deleteTopicsWithIds(Collection<Uuid> topics, DeleteTopicsOptions options);
/**
* List the topics available in the cluster with the default options.
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsWithIdsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsWithIdsResult.java
deleted file mode 100644
index eeb9119..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsWithIdsResult.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.clients.admin;
-
-import org.apache.kafka.common.KafkaFuture;
-import org.apache.kafka.common.Uuid;
-import org.apache.kafka.common.annotation.InterfaceStability;
-
-import java.util.Collection;
-import java.util.Map;
-
-/**
- * The result of the {@link Admin#deleteTopicsWithIds(Collection)} call.
- *
- * The API of this class is evolving, see {@link Admin} for details.
- */
-@InterfaceStability.Evolving
-public class DeleteTopicsWithIdsResult {
- final Map<Uuid, KafkaFuture<Void>> futures;
-
- DeleteTopicsWithIdsResult(Map<Uuid, KafkaFuture<Void>> futures) {
- this.futures = futures;
- }
-
- /**
- * Return a map from topic IDs to futures which can be used to check the status of
- * individual deletions.
- */
- public Map<Uuid, KafkaFuture<Void>> values() {
- return futures;
- }
-
- /**
- * Return a future which succeeds only if all the topic deletions succeed.
- */
- public KafkaFuture<Void> all() {
- return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]));
- }
-}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index ab40470..b2bfac3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -49,7 +49,6 @@ import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.TopicPartitionReplica;
-import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
@@ -68,7 +67,6 @@ import org.apache.kafka.common.errors.ThrottlingQuotaExceededException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnacceptableCredentialException;
import org.apache.kafka.common.errors.UnknownServerException;
-import org.apache.kafka.common.errors.UnknownTopicIdException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
@@ -110,7 +108,6 @@ import org.apache.kafka.common.message.DeleteRecordsRequestData.DeleteRecordsTop
import org.apache.kafka.common.message.DeleteRecordsResponseData;
import org.apache.kafka.common.message.DeleteRecordsResponseData.DeleteRecordsTopicResult;
import org.apache.kafka.common.message.DeleteTopicsRequestData;
-import org.apache.kafka.common.message.DeleteTopicsRequestData.DeleteTopicState;
import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult;
import org.apache.kafka.common.message.DescribeClusterRequestData;
import org.apache.kafka.common.message.DescribeConfigsRequestData;
@@ -1631,32 +1628,6 @@ public class KafkaAdminClient extends AdminClient {
return new DeleteTopicsResult(new HashMap<>(topicFutures));
}
- @Override
- public DeleteTopicsWithIdsResult deleteTopicsWithIds(final Collection<Uuid> topicIds,
- final DeleteTopicsOptions options) {
- final Map<Uuid, KafkaFutureImpl<Void>> topicFutures = new HashMap<>(topicIds.size());
- final List<Uuid> validTopicIds = new ArrayList<>(topicIds.size());
- for (Uuid topicId : topicIds) {
- if (topicId.equals(Uuid.ZERO_UUID)) {
- KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
- future.completeExceptionally(new UnknownTopicIdException("The given topic ID '" +
- topicId + "' cannot be represented in a request."));
- topicFutures.put(topicId, future);
- } else if (!topicFutures.containsKey(topicId)) {
- topicFutures.put(topicId, new KafkaFutureImpl<>());
- validTopicIds.add(topicId);
- }
- }
- if (!validTopicIds.isEmpty()) {
- final long now = time.milliseconds();
- final long deadline = calcDeadlineMs(now, options.timeoutMs());
- final Call call = getDeleteTopicsWithIdsCall(options, topicFutures, validTopicIds,
- Collections.emptyMap(), now, deadline);
- runnable.call(call, now);
- }
- return new DeleteTopicsWithIdsResult(new HashMap<>(topicFutures));
- }
-
private Call getDeleteTopicsCall(final DeleteTopicsOptions options,
final Map<String, KafkaFutureImpl<Void>> futures,
final List<String> topics,
@@ -1728,79 +1699,6 @@ public class KafkaAdminClient extends AdminClient {
}
};
}
-
- private Call getDeleteTopicsWithIdsCall(final DeleteTopicsOptions options,
- final Map<Uuid, KafkaFutureImpl<Void>> futures,
- final List<Uuid> topicIds,
- final Map<Uuid, ThrottlingQuotaExceededException> quotaExceededExceptions,
- final long now,
- final long deadline) {
- return new Call("deleteTopics", deadline, new ControllerNodeProvider()) {
- @Override
- DeleteTopicsRequest.Builder createRequest(int timeoutMs) {
- return new DeleteTopicsRequest.Builder(
- new DeleteTopicsRequestData()
- .setTopics(topicIds.stream().map(
- topic -> new DeleteTopicState().setTopicId(topic)).collect(Collectors.toList()))
- .setTimeoutMs(timeoutMs));
- }
-
- @Override
- void handleResponse(AbstractResponse abstractResponse) {
- // Check for controller change
- handleNotControllerError(abstractResponse);
- // Handle server responses for particular topics.
- final DeleteTopicsResponse response = (DeleteTopicsResponse) abstractResponse;
- final List<Uuid> retryTopics = new ArrayList<>();
- final Map<Uuid, ThrottlingQuotaExceededException> retryTopicQuotaExceededExceptions = new HashMap<>();
- for (DeletableTopicResult result : response.data().responses()) {
- KafkaFutureImpl<Void> future = futures.get(result.topicId());
- if (future == null) {
- log.warn("Server response mentioned unknown topic ID {}", result.topicId());
- } else {
- ApiError error = new ApiError(result.errorCode(), result.errorMessage());
- if (error.isFailure()) {
- if (error.is(Errors.THROTTLING_QUOTA_EXCEEDED)) {
- ThrottlingQuotaExceededException quotaExceededException = new ThrottlingQuotaExceededException(
- response.throttleTimeMs(), error.messageWithFallback());
- if (options.shouldRetryOnQuotaViolation()) {
- retryTopics.add(result.topicId());
- retryTopicQuotaExceededExceptions.put(result.topicId(), quotaExceededException);
- } else {
- future.completeExceptionally(quotaExceededException);
- }
- } else {
- future.completeExceptionally(error.exception());
- }
- } else {
- future.complete(null);
- }
- }
- }
- // If there are topics to retry, retry them; complete unrealized futures otherwise.
- if (retryTopics.isEmpty()) {
- // The server should send back a response for every topic. But do a sanity check anyway.
- completeUnrealizedFutures(futures.entrySet().stream(),
- topic -> "The controller response did not contain a result for topic " + topic);
- } else {
- final long now = time.milliseconds();
- final Call call = getDeleteTopicsWithIdsCall(options, futures, retryTopics,
- retryTopicQuotaExceededExceptions, now, deadline);
- runnable.call(call, now);
- }
- }
-
- @Override
- void handleFailure(Throwable throwable) {
- // If there were any topics retries due to a quota exceeded exception, we propagate
- // the initial error back to the caller if the request timed out.
- maybeCompleteQuotaExceededException(options.shouldRetryOnQuotaViolation(),
- throwable, futures, quotaExceededExceptions, (int) (time.milliseconds() - now));
- // Fail all the other remaining futures
- completeAllExceptionally(futures.values(), throwable);
- }
- };
- }
@Override
public ListTopicsResult listTopics(final ListTopicsOptions options) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index a5296da..ddd01be 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -65,7 +65,6 @@ import org.apache.kafka.common.errors.TopicDeletionDisabledException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.errors.UnknownServerException;
-import org.apache.kafka.common.errors.UnknownTopicIdException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.feature.Features;
@@ -413,12 +412,6 @@ public class KafkaAdminClientTest {
.setErrorCode(error.code());
}
- public static DeletableTopicResult deletableTopicResultWithId(Uuid topicId, Errors error) {
- return new DeletableTopicResult()
- .setTopicId(topicId)
- .setErrorCode(error.code());
- }
-
public static CreatePartitionsResponse prepareCreatePartitionsResponse(int throttleTimeMs, CreatePartitionsTopicResult... topics) {
CreatePartitionsResponseData data = new CreatePartitionsResponseData()
.setThrottleTimeMs(throttleTimeMs)
@@ -445,14 +438,6 @@ public class KafkaAdminClientTest {
return new DeleteTopicsResponse(data);
}
- private static DeleteTopicsResponse prepareDeleteTopicsResponseWithTopicId(Uuid id, Errors error) {
- DeleteTopicsResponseData data = new DeleteTopicsResponseData();
- data.responses().add(new DeletableTopicResult()
- .setTopicId(id)
- .setErrorCode(error.code()));
- return new DeleteTopicsResponse(data);
- }
-
private static FindCoordinatorResponse prepareFindCoordinatorResponse(Errors error, Node node) {
return FindCoordinatorResponse.prepareResponse(error, node);
}
@@ -886,33 +871,8 @@ public class KafkaAdminClientTest {
future = env.adminClient().deleteTopics(singletonList("myTopic"),
new DeleteTopicsOptions()).all();
TestUtils.assertFutureError(future, UnknownTopicOrPartitionException.class);
-
- // With topic IDs
- Uuid topicId = Uuid.randomUuid();
-
- env.kafkaClient().prepareResponse(
- expectDeleteTopicsRequestWithTopicIds(topicId),
- prepareDeleteTopicsResponseWithTopicId(topicId, Errors.NONE));
- future = env.adminClient().deleteTopicsWithIds(singletonList(topicId),
- new DeleteTopicsOptions()).all();
- assertNull(future.get());
-
- env.kafkaClient().prepareResponse(
- expectDeleteTopicsRequestWithTopicIds(topicId),
- prepareDeleteTopicsResponseWithTopicId(topicId, Errors.TOPIC_DELETION_DISABLED));
- future = env.adminClient().deleteTopicsWithIds(singletonList(topicId),
- new DeleteTopicsOptions()).all();
- TestUtils.assertFutureError(future, TopicDeletionDisabledException.class);
-
- env.kafkaClient().prepareResponse(
- expectDeleteTopicsRequestWithTopicIds(topicId),
- prepareDeleteTopicsResponseWithTopicId(topicId, Errors.UNKNOWN_TOPIC_ID));
- future = env.adminClient().deleteTopicsWithIds(singletonList(topicId),
- new DeleteTopicsOptions()).all();
- TestUtils.assertFutureError(future, UnknownTopicIdException.class);
}
}
-
@Test
public void testDeleteTopicsPartialResponse() throws Exception {
@@ -929,20 +889,6 @@ public class KafkaAdminClientTest {
result.values().get("myTopic").get();
TestUtils.assertFutureThrows(result.values().get("myOtherTopic"), ApiException.class);
-
- // With topic IDs
- Uuid topicId1 = Uuid.randomUuid();
- Uuid topicId2 = Uuid.randomUuid();
- env.kafkaClient().prepareResponse(
- expectDeleteTopicsRequestWithTopicIds(topicId1, topicId2),
- prepareDeleteTopicsResponse(1000,
- deletableTopicResultWithId(topicId1, Errors.NONE)));
-
- DeleteTopicsWithIdsResult resultIds = env.adminClient().deleteTopicsWithIds(
- asList(topicId1, topicId2), new DeleteTopicsOptions());
-
- resultIds.values().get(topicId1).get();
- TestUtils.assertFutureThrows(resultIds.values().get(topicId2), ApiException.class);
}
}
@@ -975,36 +921,6 @@ public class KafkaAdminClientTest {
assertNull(result.values().get("topic1").get());
assertNull(result.values().get("topic2").get());
TestUtils.assertFutureThrows(result.values().get("topic3"), TopicExistsException.class);
-
- // With topic IDs
- Uuid topicId1 = Uuid.randomUuid();
- Uuid topicId2 = Uuid.randomUuid();
- Uuid topicId3 = Uuid.randomUuid();
-
- env.kafkaClient().prepareResponse(
- expectDeleteTopicsRequestWithTopicIds(topicId1, topicId2, topicId3),
- prepareDeleteTopicsResponse(1000,
- deletableTopicResultWithId(topicId1, Errors.NONE),
- deletableTopicResultWithId(topicId2, Errors.THROTTLING_QUOTA_EXCEEDED),
- deletableTopicResultWithId(topicId3, Errors.UNKNOWN_TOPIC_ID)));
-
- env.kafkaClient().prepareResponse(
- expectDeleteTopicsRequestWithTopicIds(topicId2),
- prepareDeleteTopicsResponse(1000,
- deletableTopicResultWithId(topicId2, Errors.THROTTLING_QUOTA_EXCEEDED)));
-
- env.kafkaClient().prepareResponse(
- expectDeleteTopicsRequestWithTopicIds(topicId2),
- prepareDeleteTopicsResponse(0,
- deletableTopicResultWithId(topicId2, Errors.NONE)));
-
- DeleteTopicsWithIdsResult resultIds = env.adminClient().deleteTopicsWithIds(
- asList(topicId1, topicId2, topicId3),
- new DeleteTopicsOptions().retryOnQuotaViolation(true));
-
- assertNull(resultIds.values().get(topicId1).get());
- assertNull(resultIds.values().get(topicId2).get());
- TestUtils.assertFutureThrows(resultIds.values().get(topicId3), UnknownTopicIdException.class);
}
}
@@ -1050,43 +966,6 @@ public class KafkaAdminClientTest {
ThrottlingQuotaExceededException.class);
assertEquals(0, e.throttleTimeMs());
TestUtils.assertFutureThrows(result.values().get("topic3"), TopicExistsException.class);
-
- // With topic IDs
- Uuid topicId1 = Uuid.randomUuid();
- Uuid topicId2 = Uuid.randomUuid();
- Uuid topicId3 = Uuid.randomUuid();
- env.kafkaClient().prepareResponse(
- expectDeleteTopicsRequestWithTopicIds(topicId1, topicId2, topicId3),
- prepareDeleteTopicsResponse(1000,
- deletableTopicResultWithId(topicId1, Errors.NONE),
- deletableTopicResultWithId(topicId2, Errors.THROTTLING_QUOTA_EXCEEDED),
- deletableTopicResultWithId(topicId3, Errors.UNKNOWN_TOPIC_ID)));
-
- env.kafkaClient().prepareResponse(
- expectDeleteTopicsRequestWithTopicIds(topicId2),
- prepareDeleteTopicsResponse(1000,
- deletableTopicResultWithId(topicId2, Errors.THROTTLING_QUOTA_EXCEEDED)));
-
- DeleteTopicsWithIdsResult resultIds = env.adminClient().deleteTopicsWithIds(
- asList(topicId1, topicId2, topicId3),
- new DeleteTopicsOptions().retryOnQuotaViolation(true));
-
- // Wait until the prepared attempts have consumed
- TestUtils.waitForCondition(() -> env.kafkaClient().numAwaitingResponses() == 0,
- "Failed awaiting DeleteTopics requests");
-
- // Wait until the next request is sent out
- TestUtils.waitForCondition(() -> env.kafkaClient().inFlightRequestCount() == 1,
- "Failed awaiting next DeleteTopics request");
-
- // Advance time past the default api timeout to time out the inflight request
- time.sleep(defaultApiTimeout + 1);
-
- assertNull(resultIds.values().get(topicId1).get());
- e = TestUtils.assertFutureThrows(resultIds.values().get(topicId2),
- ThrottlingQuotaExceededException.class);
- assertEquals(0, e.throttleTimeMs());
- TestUtils.assertFutureThrows(resultIds.values().get(topicId3), UnknownTopicIdException.class);
}
}
@@ -1111,27 +990,6 @@ public class KafkaAdminClientTest {
ThrottlingQuotaExceededException.class);
assertEquals(1000, e.throttleTimeMs());
TestUtils.assertFutureError(result.values().get("topic3"), TopicExistsException.class);
-
- // With topic IDs
- Uuid topicId1 = Uuid.randomUuid();
- Uuid topicId2 = Uuid.randomUuid();
- Uuid topicId3 = Uuid.randomUuid();
- env.kafkaClient().prepareResponse(
- expectDeleteTopicsRequestWithTopicIds(topicId1, topicId2, topicId3),
- prepareDeleteTopicsResponse(1000,
- deletableTopicResultWithId(topicId1, Errors.NONE),
- deletableTopicResultWithId(topicId2, Errors.THROTTLING_QUOTA_EXCEEDED),
- deletableTopicResultWithId(topicId3, Errors.UNKNOWN_TOPIC_ID)));
-
- DeleteTopicsWithIdsResult resultIds = env.adminClient().deleteTopicsWithIds(
- asList(topicId1, topicId2, topicId3),
- new DeleteTopicsOptions().retryOnQuotaViolation(false));
-
- assertNull(resultIds.values().get(topicId1).get());
- e = TestUtils.assertFutureThrows(resultIds.values().get(topicId2),
- ThrottlingQuotaExceededException.class);
- assertEquals(1000, e.throttleTimeMs());
- TestUtils.assertFutureError(resultIds.values().get(topicId3), UnknownTopicIdException.class);
}
}
@@ -1145,16 +1003,6 @@ public class KafkaAdminClientTest {
};
}
- private MockClient.RequestMatcher expectDeleteTopicsRequestWithTopicIds(final Uuid... topicIds) {
- return body -> {
- if (body instanceof DeleteTopicsRequest) {
- DeleteTopicsRequest request = (DeleteTopicsRequest) body;
- return request.topicIds().equals(Arrays.asList(topicIds));
- }
- return false;
- };
- }
-
@Test
public void testInvalidTopicNames() throws Exception {
try (AdminClientUnitTestEnv env = mockClientEnv()) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index c2b9cff..f4ccc98 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -423,37 +423,6 @@ public class MockAdminClient extends AdminClient {
}
@Override
- synchronized public DeleteTopicsWithIdsResult deleteTopicsWithIds(Collection<Uuid> topicsToDelete, DeleteTopicsOptions options) {
- Map<Uuid, KafkaFuture<Void>> deleteTopicsWithIdsResult = new HashMap<>();
-
- if (timeoutNextRequests > 0) {
- for (final Uuid topicId : topicsToDelete) {
- KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
- future.completeExceptionally(new TimeoutException());
- deleteTopicsWithIdsResult.put(topicId, future);
- }
-
- --timeoutNextRequests;
- return new DeleteTopicsWithIdsResult(deleteTopicsWithIdsResult);
- }
-
- for (final Uuid topicId : topicsToDelete) {
- KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
-
- String name = topicNames.remove(topicId);
- if (name == null || allTopics.remove(name) == null) {
- future.completeExceptionally(new UnknownTopicOrPartitionException(String.format("Topic %s does not exist.", topicId)));
- } else {
- topicIds.remove(name);
- future.complete(null);
- }
- deleteTopicsWithIdsResult.put(topicId, future);
- }
-
- return new DeleteTopicsWithIdsResult(deleteTopicsWithIdsResult);
- }
-
- @Override
synchronized public CreatePartitionsResult createPartitions(Map<String, NewPartitions> newPartitions, CreatePartitionsOptions options) {
throw new UnsupportedOperationException("Not implemented yet");
}
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 87cb257..049ef67 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -48,7 +48,6 @@ import org.slf4j.LoggerFactory
import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
import scala.collection.Seq
-import scala.compat.java8.OptionConverters._
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
import scala.util.Random
@@ -117,24 +116,6 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
@Test
- def testDeleteTopicsWithIds(): Unit = {
- client = Admin.create(createConfig)
- val topics = Seq("mytopic", "mytopic2", "mytopic3")
- val newTopics = Seq(
- new NewTopic("mytopic", Map((0: Integer) -> Seq[Integer](1, 2).asJava, (1: Integer) -> Seq[Integer](2, 0).asJava).asJava),
- new NewTopic("mytopic2", 3, 3.toShort),
- new NewTopic("mytopic3", Option.empty[Integer].asJava, Option.empty[java.lang.Short].asJava)
- )
- val createResult = client.createTopics(newTopics.asJava)
- createResult.all.get()
- waitForTopics(client, topics, List())
- val topicIds = getTopicIds().values.toSet
-
- client.deleteTopicsWithIds(topicIds.asJava).all.get()
- waitForTopics(client, List(), topics)
- }
-
- @Test
def testMetadataRefresh(): Unit = {
client = Admin.create(createConfig)
val topics = Seq("mytopic")