You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sh...@apache.org on 2023/05/23 10:30:58 UTC

[kafka] branch 3.4 updated (2f134711817 -> a08f2c51db5)

This is an automated email from the ASF dual-hosted git repository.

showuon pushed a change to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


    from 2f134711817 KAFKA-14862 (HOTFIX): Fix ConcurrentModificationException (#13734)
     new f683178805c Fix log DateTime format unit test (#13441)
     new a08f2c51db5 MINOR: Don't throw if MirrorMaker topics already exist (#13005)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/kafka/common/utils/UtilsTest.java   | 10 +--
 .../apache/kafka/connect/mirror/MirrorUtils.java   |  1 +
 .../kafka/connect/mirror/MirrorUtilsTest.java      | 87 ++++++++++++++++++++++
 3 files changed, 92 insertions(+), 6 deletions(-)
 create mode 100644 connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorUtilsTest.java


[kafka] 02/02: MINOR: Don't throw if MirrorMaker topics already exist (#13005)

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit a08f2c51db588f3b224318a5d8a8c3de95fa8dad
Author: Mickael Maison <mi...@users.noreply.github.com>
AuthorDate: Tue Dec 20 14:33:32 2022 +0100

    MINOR: Don't throw if MirrorMaker topics already exist (#13005)
    
    
    Reviewers: Luke Chen <sh...@gmail.com>
---
 .../apache/kafka/connect/mirror/MirrorUtils.java   |  1 +
 .../kafka/connect/mirror/MirrorUtilsTest.java      | 87 ++++++++++++++++++++++
 2 files changed, 88 insertions(+)

diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java
index dc3910da34c..eb6bdeebb9e 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java
@@ -130,6 +130,7 @@ final class MirrorUtils {
             Throwable cause = e.getCause();
             if (cause instanceof TopicExistsException) {
                 log.debug("Unable to create topic '{}' since it already exists.", topicName);
+                return;
             }
             if (cause instanceof UnsupportedVersionException) {
                 log.debug("Unable to create topic '{}' since the brokers do not support the CreateTopics API." +
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorUtilsTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorUtilsTest.java
new file mode 100644
index 00000000000..bf44afebf91
--- /dev/null
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorUtilsTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.TopicExistsException;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class MirrorUtilsTest {
+
+    private static final String TOPIC = "topic";
+
+    private final Admin admin = mock(Admin.class);
+    private final CreateTopicsResult ctr = mock(CreateTopicsResult.class);
+    @SuppressWarnings("unchecked")
+    private final KafkaFuture<Void> future = mock(KafkaFuture.class);
+
+    @Test
+    public void testCreateCompactedTopic() throws Exception {
+        Map<String, KafkaFuture<Void>> values = Collections.singletonMap(TOPIC, future);
+        when(future.get()).thenReturn(null);
+        when(ctr.values()).thenReturn(values);
+        when(admin.createTopics(any(), any())).thenReturn(ctr);
+        MirrorUtils.createCompactedTopic(TOPIC, (short) 1, (short) 1, admin);
+
+        verify(future).get();
+        verify(ctr).values();
+        verify(admin).createTopics(any(), any());
+    }
+
+    @Test
+    public void testCreateCompactedTopicAlreadyExists() throws Exception {
+        Map<String, KafkaFuture<Void>> values = Collections.singletonMap(TOPIC, future);
+        when(future.get()).thenThrow(new ExecutionException(new TopicExistsException("topic exists")));
+        when(ctr.values()).thenReturn(values);
+        when(admin.createTopics(any(), any())).thenReturn(ctr);
+        MirrorUtils.createCompactedTopic(TOPIC, (short) 1, (short) 1, admin);
+
+        verify(future).get();
+        verify(ctr).values();
+        verify(admin).createTopics(any(), any());
+    }
+
+    @Test
+    public void testCreateCompactedTopicFails() throws Exception {
+        Map<String, KafkaFuture<Void>> values = Collections.singletonMap(TOPIC, future);
+        when(future.get()).thenThrow(new ExecutionException(new ClusterAuthorizationException("not authorized")));
+        when(ctr.values()).thenReturn(values);
+        when(admin.createTopics(any(), any())).thenReturn(ctr);
+        Throwable ce = assertThrows(ConnectException.class, () -> MirrorUtils.createCompactedTopic(TOPIC, (short) 1, (short) 1, admin), "Should have exception thrown");
+
+        assertTrue(ce.getCause() instanceof ExecutionException);
+        assertTrue(ce.getCause().getCause() instanceof ClusterAuthorizationException);
+        verify(future).get();
+        verify(ctr).values();
+        verify(admin).createTopics(any(), any());
+    }
+}


[kafka] 01/02: Fix log DateTime format unit test (#13441)

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit f683178805caad6eca9d232503fe4b9f533f76ff
Author: egyedt <81...@users.noreply.github.com>
AuthorDate: Mon Mar 27 10:48:47 2023 +0200

    Fix log DateTime format unit test (#13441)
    
    Reviewers: Viktor Somogyi-Vass <vi...@gmail.com>
---
 .../src/test/java/org/apache/kafka/common/utils/UtilsTest.java | 10 ++++------
 1 file changed, 4 insertions(+), 6 deletions(-)

diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
index d10fd37a71b..c75ad9d5e4f 100755
--- a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
@@ -34,7 +34,6 @@ import java.nio.channels.FileChannel;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.StandardOpenOption;
-import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
 import java.time.ZoneOffset;
@@ -915,14 +914,13 @@ public class UtilsTest {
 
     @Test
     public void testToLogDateTimeFormat() {
+        final LocalDateTime timestampWithMilliSeconds = LocalDateTime.of(2020, 11, 9, 12, 34, 5, 123000000);
+        final LocalDateTime timestampWithSeconds = LocalDateTime.of(2020, 11, 9, 12, 34, 5);
+
         DateTimeFormatter offsetFormatter = DateTimeFormatter.ofPattern("XXX");
-        ZoneOffset offset = ZoneId.systemDefault().getRules().getOffset(Instant.now());
-        
+        ZoneOffset offset = ZoneId.systemDefault().getRules().getOffset(timestampWithSeconds);
         String requiredOffsetFormat = offsetFormatter.format(offset);
 
-        final LocalDateTime timestampWithMilliSeconds = LocalDateTime.of(2020, 11, 9, 12, 34, 5, 123000000);
-        final LocalDateTime timestampWithSeconds = LocalDateTime.of(2020, 11, 9, 12, 34, 5);
-        
         assertEquals(String.format("2020-11-09 12:34:05,123 %s", requiredOffsetFormat), Utils.toLogDateTimeFormat(timestampWithMilliSeconds.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()));
         assertEquals(String.format("2020-11-09 12:34:05,000 %s", requiredOffsetFormat), Utils.toLogDateTimeFormat(timestampWithSeconds.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()));
     }