You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sh...@apache.org on 2023/05/23 10:31:00 UTC

[kafka] 02/02: MINOR: Don't throw if MirrorMaker topics already exist (#13005)

This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit a08f2c51db588f3b224318a5d8a8c3de95fa8dad
Author: Mickael Maison <mi...@users.noreply.github.com>
AuthorDate: Tue Dec 20 14:33:32 2022 +0100

    MINOR: Don't throw if MirrorMaker topics already exist (#13005)
    
    
    Reviewers: Luke Chen <sh...@gmail.com>
---
 .../apache/kafka/connect/mirror/MirrorUtils.java   |  1 +
 .../kafka/connect/mirror/MirrorUtilsTest.java      | 87 ++++++++++++++++++++++
 2 files changed, 88 insertions(+)

diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java
index dc3910da34c..eb6bdeebb9e 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java
@@ -130,6 +130,7 @@ final class MirrorUtils {
             Throwable cause = e.getCause();
             if (cause instanceof TopicExistsException) {
                 log.debug("Unable to create topic '{}' since it already exists.", topicName);
+                return;
             }
             if (cause instanceof UnsupportedVersionException) {
                 log.debug("Unable to create topic '{}' since the brokers do not support the CreateTopics API." +
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorUtilsTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorUtilsTest.java
new file mode 100644
index 00000000000..bf44afebf91
--- /dev/null
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorUtilsTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.TopicExistsException;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class MirrorUtilsTest {
+
+    private static final String TOPIC = "topic";
+
+    private final Admin admin = mock(Admin.class);
+    private final CreateTopicsResult ctr = mock(CreateTopicsResult.class);
+    @SuppressWarnings("unchecked")
+    private final KafkaFuture<Void> future = mock(KafkaFuture.class);
+
+    @Test
+    public void testCreateCompactedTopic() throws Exception {
+        Map<String, KafkaFuture<Void>> values = Collections.singletonMap(TOPIC, future);
+        when(future.get()).thenReturn(null);
+        when(ctr.values()).thenReturn(values);
+        when(admin.createTopics(any(), any())).thenReturn(ctr);
+        MirrorUtils.createCompactedTopic(TOPIC, (short) 1, (short) 1, admin);
+
+        verify(future).get();
+        verify(ctr).values();
+        verify(admin).createTopics(any(), any());
+    }
+
+    @Test
+    public void testCreateCompactedTopicAlreadyExists() throws Exception {
+        Map<String, KafkaFuture<Void>> values = Collections.singletonMap(TOPIC, future);
+        when(future.get()).thenThrow(new ExecutionException(new TopicExistsException("topic exists")));
+        when(ctr.values()).thenReturn(values);
+        when(admin.createTopics(any(), any())).thenReturn(ctr);
+        MirrorUtils.createCompactedTopic(TOPIC, (short) 1, (short) 1, admin);
+
+        verify(future).get();
+        verify(ctr).values();
+        verify(admin).createTopics(any(), any());
+    }
+
+    @Test
+    public void testCreateCompactedTopicFails() throws Exception {
+        Map<String, KafkaFuture<Void>> values = Collections.singletonMap(TOPIC, future);
+        when(future.get()).thenThrow(new ExecutionException(new ClusterAuthorizationException("not authorized")));
+        when(ctr.values()).thenReturn(values);
+        when(admin.createTopics(any(), any())).thenReturn(ctr);
+        Throwable ce = assertThrows(ConnectException.class, () -> MirrorUtils.createCompactedTopic(TOPIC, (short) 1, (short) 1, admin), "Should have exception thrown");
+
+        assertTrue(ce.getCause() instanceof ExecutionException);
+        assertTrue(ce.getCause().getCause() instanceof ClusterAuthorizationException);
+        verify(future).get();
+        verify(ctr).values();
+        verify(admin).createTopics(any(), any());
+    }
+}