You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/06/30 01:14:43 UTC

[GitHub] [kafka] hachikuji commented on a change in pull request #10892: KAFKA-13011: Update deleteTopics Admin API

hachikuji commented on a change in pull request #10892:
URL: https://github.com/apache/kafka/pull/10892#discussion_r661058030



##########
File path: clients/src/main/java/org/apache/kafka/common/TopicCollection.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A class used to represent a collection of topics. This collection may define topics by topic name
+ * or topic ID. Subclassing this class beyond the classes provided here is not supported.
+ */
+public abstract class TopicCollection {
+
+    private TopicCollection() {}
+
+    /**
+     * @return a collection of topics defined by topic ID
+     */
+    public static TopicIdCollection ofTopicIds(Collection<Uuid> topics) {
+        return new TopicIdCollection(topics);

Review comment:
       Since the collection here comes from the user, maybe we should make a copy. Otherwise, the application could mutate it while we have a reference.

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java
##########
@@ -30,24 +31,56 @@
  */
 @InterfaceStability.Evolving
 public class DeleteTopicsResult {
-    final Map<String, KafkaFuture<Void>> futures;
+    private final Map<Uuid, KafkaFuture<Void>> topicIdFutures;
+    private final Map<String, KafkaFuture<Void>> nameFutures;
 
-    protected DeleteTopicsResult(Map<String, KafkaFuture<Void>> futures) {
-        this.futures = futures;
+    protected DeleteTopicsResult(Map<Uuid, KafkaFuture<Void>> topicIdFutures, Map<String, KafkaFuture<Void>> nameFutures) {
+        if (topicIdFutures != null && nameFutures != null)

Review comment:
       This allows both of them to be null?

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -1688,15 +1691,27 @@ void handleFailure(Throwable throwable) {
     }
 
     @Override
-    public DeleteTopicsResult deleteTopics(final Collection<String> topicNames,
+    public DeleteTopicsResult deleteTopics(final TopicCollection topics,
                                            final DeleteTopicsOptions options) {
+        DeleteTopicsResult result;
+        if (topics instanceof TopicIdCollection)
+            result = DeleteTopicsResult.ofTopicIds(new HashMap<>(handleDeleteTopicsUsingIds(((TopicIdCollection) topics).topicIds(), options)));

Review comment:
       Why do we copy the result of `handleDeleteTopicsUsingIds`? Seems like that method is already returning a fresh map.

##########
File path: clients/src/test/java/org/apache/kafka/clients/admin/DeleteTopicsResultTest.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.junit.jupiter.api.Test;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Collections;
+import java.util.Map;
+
+public class DeleteTopicsResultTest {
+
+    @Test
+    public void testDeleteTopicsResult() {

Review comment:
       nit: would it make sense to turn this into two separate tests? One for ids and one for names?

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java
##########
@@ -30,24 +31,56 @@
  */
 @InterfaceStability.Evolving
 public class DeleteTopicsResult {
-    final Map<String, KafkaFuture<Void>> futures;
+    private final Map<Uuid, KafkaFuture<Void>> topicIdFutures;
+    private final Map<String, KafkaFuture<Void>> nameFutures;
 
-    protected DeleteTopicsResult(Map<String, KafkaFuture<Void>> futures) {
-        this.futures = futures;
+    protected DeleteTopicsResult(Map<Uuid, KafkaFuture<Void>> topicIdFutures, Map<String, KafkaFuture<Void>> nameFutures) {
+        if (topicIdFutures != null && nameFutures != null)
+            throw new IllegalArgumentException("topicIdFutures and nameFutures can not both be specified.");
+        this.topicIdFutures = topicIdFutures;
+        this.nameFutures = nameFutures;
     }
 
+    protected static DeleteTopicsResult ofTopicIds(Map<Uuid, KafkaFuture<Void>> topicIdFutures) {
+        DeleteTopicsResult result = new DeleteTopicsResult(topicIdFutures, null);
+        return result;
+    }
+
+    protected static DeleteTopicsResult ofTopicNames(Map<String, KafkaFuture<Void>> nameFutures) {
+        DeleteTopicsResult result = new DeleteTopicsResult(null, nameFutures);

Review comment:
       nit: the variable is not adding much. Could we just return?

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java
##########
@@ -30,24 +31,56 @@
  */
 @InterfaceStability.Evolving
 public class DeleteTopicsResult {
-    final Map<String, KafkaFuture<Void>> futures;
+    private final Map<Uuid, KafkaFuture<Void>> topicIdFutures;
+    private final Map<String, KafkaFuture<Void>> nameFutures;
 
-    protected DeleteTopicsResult(Map<String, KafkaFuture<Void>> futures) {
-        this.futures = futures;
+    protected DeleteTopicsResult(Map<Uuid, KafkaFuture<Void>> topicIdFutures, Map<String, KafkaFuture<Void>> nameFutures) {
+        if (topicIdFutures != null && nameFutures != null)
+            throw new IllegalArgumentException("topicIdFutures and nameFutures can not both be specified.");
+        this.topicIdFutures = topicIdFutures;
+        this.nameFutures = nameFutures;
     }
 
+    protected static DeleteTopicsResult ofTopicIds(Map<Uuid, KafkaFuture<Void>> topicIdFutures) {

Review comment:
       Do these methods need to be protected or could they be package access?

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java
##########
@@ -30,24 +31,56 @@
  */
 @InterfaceStability.Evolving
 public class DeleteTopicsResult {
-    final Map<String, KafkaFuture<Void>> futures;
+    private final Map<Uuid, KafkaFuture<Void>> topicIdFutures;
+    private final Map<String, KafkaFuture<Void>> nameFutures;
 
-    protected DeleteTopicsResult(Map<String, KafkaFuture<Void>> futures) {
-        this.futures = futures;
+    protected DeleteTopicsResult(Map<Uuid, KafkaFuture<Void>> topicIdFutures, Map<String, KafkaFuture<Void>> nameFutures) {
+        if (topicIdFutures != null && nameFutures != null)
+            throw new IllegalArgumentException("topicIdFutures and nameFutures can not both be specified.");

Review comment:
       nit: cannot?

##########
File path: clients/src/main/java/org/apache/kafka/common/TopicCollection.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A class used to represent a collection of topics. This collection may define topics by topic name
+ * or topic ID. Subclassing this class beyond the classes provided here is not supported.
+ */
+public abstract class TopicCollection {
+
+    private TopicCollection() {}
+
+    /**
+     * @return a collection of topics defined by topic ID
+     */
+    public static TopicIdCollection ofTopicIds(Collection<Uuid> topics) {
+        return new TopicIdCollection(topics);
+    }
+
+    /**
+     * @return a collection of topics defined by topic name
+     */
+    public static TopicNameCollection ofTopicNames(Collection<String> topics) {
+        return new TopicNameCollection(topics);
+    }
+
+    /**
+     * A class used to represent a collection of topics defined by their topic ID.
+     * Subclassing this class beyond the classes provided here is not supported.
+     */
+    public static class TopicIdCollection extends TopicCollection {
+        private final Collection<Uuid> topicIds;
+
+        private TopicIdCollection(Collection<Uuid> topicIds) {
+            super();

Review comment:
       nit: can remove this. A few of these in the PR.

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java
##########
@@ -30,24 +31,56 @@
  */
 @InterfaceStability.Evolving
 public class DeleteTopicsResult {
-    final Map<String, KafkaFuture<Void>> futures;
+    private final Map<Uuid, KafkaFuture<Void>> topicIdFutures;
+    private final Map<String, KafkaFuture<Void>> nameFutures;
 
-    protected DeleteTopicsResult(Map<String, KafkaFuture<Void>> futures) {
-        this.futures = futures;
+    protected DeleteTopicsResult(Map<Uuid, KafkaFuture<Void>> topicIdFutures, Map<String, KafkaFuture<Void>> nameFutures) {
+        if (topicIdFutures != null && nameFutures != null)
+            throw new IllegalArgumentException("topicIdFutures and nameFutures can not both be specified.");
+        this.topicIdFutures = topicIdFutures;
+        this.nameFutures = nameFutures;
     }
 
+    protected static DeleteTopicsResult ofTopicIds(Map<Uuid, KafkaFuture<Void>> topicIdFutures) {
+        DeleteTopicsResult result = new DeleteTopicsResult(topicIdFutures, null);
+        return result;
+    }
+
+    protected static DeleteTopicsResult ofTopicNames(Map<String, KafkaFuture<Void>> nameFutures) {
+        DeleteTopicsResult result = new DeleteTopicsResult(null, nameFutures);
+        return result;
+    }
+
+    /**
+     * @return a map from topic IDs to futures which can be used to check the status of
+     * individual deletions if the deleteTopics request used topic IDs. Otherwise return null.
+     */
+    public Map<Uuid, KafkaFuture<Void>> topicIdValues() {
+        return topicIdFutures;
+    }
+
+    /**
+     * @return a map from topic names to futures which can be used to check the status of
+     * individual deletions if the deleteTopics request used topic names. Otherwise return null.
+     */
+    public Map<String, KafkaFuture<Void>> topicNameValues() {
+        return nameFutures;
+    }
+
+    @Deprecated

Review comment:
       Helpful to include the version in which this has become deprecated. Also, the method should point the user to `topicNameValues`. For example:
   ```java
        * @deprecated Since 3.0 use {@link #topicNameValues} instead
   ```
   

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java
##########
@@ -30,24 +31,56 @@
  */
 @InterfaceStability.Evolving
 public class DeleteTopicsResult {
-    final Map<String, KafkaFuture<Void>> futures;
+    private final Map<Uuid, KafkaFuture<Void>> topicIdFutures;
+    private final Map<String, KafkaFuture<Void>> nameFutures;
 
-    protected DeleteTopicsResult(Map<String, KafkaFuture<Void>> futures) {
-        this.futures = futures;
+    protected DeleteTopicsResult(Map<Uuid, KafkaFuture<Void>> topicIdFutures, Map<String, KafkaFuture<Void>> nameFutures) {
+        if (topicIdFutures != null && nameFutures != null)
+            throw new IllegalArgumentException("topicIdFutures and nameFutures can not both be specified.");
+        this.topicIdFutures = topicIdFutures;
+        this.nameFutures = nameFutures;
     }
 
+    protected static DeleteTopicsResult ofTopicIds(Map<Uuid, KafkaFuture<Void>> topicIdFutures) {
+        DeleteTopicsResult result = new DeleteTopicsResult(topicIdFutures, null);
+        return result;
+    }
+
+    protected static DeleteTopicsResult ofTopicNames(Map<String, KafkaFuture<Void>> nameFutures) {
+        DeleteTopicsResult result = new DeleteTopicsResult(null, nameFutures);
+        return result;
+    }
+
+    /**

Review comment:
       I think we can add a brief description here which refers back to the admin api using TopicIdCollection. Similarly below.

##########
File path: clients/src/test/java/org/apache/kafka/clients/admin/TopicCollectionTest.java
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.TopicCollection;
+import org.apache.kafka.common.TopicCollection.TopicIdCollection;
+import org.apache.kafka.common.TopicCollection.TopicNameCollection;
+import org.junit.jupiter.api.Test;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class TopicCollectionTest {
+
+    @Test
+    public void testTopicCollection() {
+
+        List<Uuid> topicIds = Arrays.asList(Uuid.randomUuid(), Uuid.randomUuid(), Uuid.randomUuid());
+        List<String> topicNames = Arrays.asList("foo", "bar");
+
+        TopicCollection idCollection = TopicCollection.ofTopicIds(topicIds);
+        TopicCollection nameCollection = TopicCollection.ofTopicNames(topicNames);
+
+        assertTrue(((TopicIdCollection) idCollection).topicIds().containsAll(topicIds));
+        assertTrue(((TopicNameCollection) nameCollection).topicNames().containsAll(topicNames));
+    }
+
+}

Review comment:
       nit: add newline




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org