You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2021/03/04 20:20:39 UTC

[kafka] branch trunk updated: HOTFIX: Controller topic deletion should be atomic (#10264)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 60a097a  HOTFIX: Controller topic deletion should be atomic (#10264)
60a097a is described below

commit 60a097ae406bf2b263d7651ead762ccf1845b14b
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Thu Mar 4 12:19:34 2021 -0800

    HOTFIX: Controller topic deletion should be atomic (#10264)
    
    Topic deletions should be atomic. This fixes a build error caused by merging of both https://github.com/apache/kafka/pull/10253 and https://github.com/apache/kafka/pull/10184 at about the same time.
    
    Reviewers: David Arthur <mu...@gmail.com>
---
 .../controller/ReplicationControlManager.java      |  2 +-
 .../controller/ReplicationControlManagerTest.java  | 24 ++++++++++++----------
 2 files changed, 14 insertions(+), 12 deletions(-)

diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index 59798c4..df16479 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -624,7 +624,7 @@ public class ReplicationControlManager {
                 results.put(id, ApiError.fromThrowable(e));
             }
         }
-        return new ControllerResult<>(records, results);
+        return ControllerResult.atomicOf(records, results);
     }
 
     void deleteTopic(Uuid id, List<ApiMessageAndVersion> records) {
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index da6e4af..8b00c10 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -67,6 +67,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 
 @Timeout(40)
@@ -451,19 +452,19 @@ public class ReplicationControlManagerTest {
         unfenceBroker(0, ctx);
         registerBroker(1, ctx);
         unfenceBroker(1, ctx);
-        ControllerResult<CreateTopicsResponseData> result =
+        ControllerResult<CreateTopicsResponseData> createResult =
             replicationControl.createTopics(request);
         CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData();
-        Uuid topicId = result.response().topics().find("foo").topicId();
+        Uuid topicId = createResult.response().topics().find("foo").topicId();
         expectedResponse.topics().add(new CreatableTopicResult().setName("foo").
             setNumPartitions(3).setReplicationFactor((short) 2).
             setErrorMessage(null).setErrorCode((short) 0).
             setTopicId(topicId));
-        assertEquals(expectedResponse, result.response());
+        assertEquals(expectedResponse, createResult.response());
         // Until the records are replayed, no changes are made
         assertNull(replicationControl.getPartition(topicId, 0));
         assertEmptyTopicConfigs(ctx, "foo");
-        ctx.replay(result.records());
+        ctx.replay(createResult.records());
         assertNotNull(replicationControl.getPartition(topicId, 0));
         assertNotNull(replicationControl.getPartition(topicId, 1));
         assertNotNull(replicationControl.getPartition(topicId, 2));
@@ -483,17 +484,18 @@ public class ReplicationControlManagerTest {
             new ResultOrError<>(new ApiError(UNKNOWN_TOPIC_OR_PARTITION))),
                 replicationControl.findTopicIds(Long.MAX_VALUE, Collections.singleton("bar")));
 
-        ControllerResult<Map<Uuid, ApiError>> result1 = replicationControl.
+        ControllerResult<Map<Uuid, ApiError>> invalidDeleteResult = replicationControl.
             deleteTopics(Collections.singletonList(invalidId));
-        assertEquals(0, result1.records().size());
+        assertEquals(0, invalidDeleteResult.records().size());
         assertEquals(Collections.singletonMap(invalidId, new ApiError(UNKNOWN_TOPIC_ID, null)),
-            result1.response());
-        ControllerResult<Map<Uuid, ApiError>> result2 = replicationControl.
+            invalidDeleteResult.response());
+        ControllerResult<Map<Uuid, ApiError>> deleteResult = replicationControl.
             deleteTopics(Collections.singletonList(topicId));
+        assertTrue(deleteResult.isAtomic());
         assertEquals(Collections.singletonMap(topicId, new ApiError(NONE, null)),
-            result2.response());
-        assertEquals(1, result2.records().size());
-        ctx.replay(result2.records());
+            deleteResult.response());
+        assertEquals(1, deleteResult.records().size());
+        ctx.replay(deleteResult.records());
         assertNull(replicationControl.getPartition(topicId, 0));
         assertNull(replicationControl.getPartition(topicId, 1));
         assertNull(replicationControl.getPartition(topicId, 2));