You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2021/02/04 01:09:45 UTC

[kafka] branch 2.6 updated: KAFKA-12270: Handle race condition when Connect tasks attempt to create topics (#10032)

This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new 0cdbfa7  KAFKA-12270: Handle race condition when Connect tasks attempt to create topics (#10032)
0cdbfa7 is described below

commit 0cdbfa7836ba3c98ca7a80031fdaeec70da9a887
Author: Randall Hauch <rh...@gmail.com>
AuthorDate: Wed Feb 3 18:29:55 2021 -0600

    KAFKA-12270: Handle race condition when Connect tasks attempt to create topics (#10032)
    
    When a source connector is configured to create missing topics has multiple tasks that generate records for the same topic, it is possible that multiple tasks may simultaneously describe the topic, find it does not exist, and attempt to create the task. One of those create topic requests will succeed, and the other concurrent tasks will receive the response from the topic admin as having not created the task and will fail unnecessarily.
    
    This change corrects the logic by moving the `TopicAdmin` logic to create a topic to a new `createOrFindTopics(…)` method that returns the set of created topic names and the set of existing topic names. This allows the existing `createTopics(…)` and `createTopic(…)` methods to retain the same behavior, but also allows the `WorkerSourceTask` to know from its single call to this new method whether the topic was created or was found to exist.
    
    This adds one unit test and modifies several unit tests in `WorkerSourceTaskWithTopicCreationTest` that use mocks to verify the behavior, and modifies several existing unit tests for `TopicAdminTest` to ensure the logic of the new method is as expected.
    
    Author: Randall Hauch <rh...@gmail.com>
    Reviewer: Konstantine Karantasis <ko...@confluent.io>
---
 .../kafka/connect/runtime/WorkerSourceTask.java    |   7 +-
 .../org/apache/kafka/connect/util/TopicAdmin.java  | 103 +++++++++++++++++++--
 .../ErrorHandlingTaskWithTopicCreationTest.java    |   6 +-
 .../WorkerSourceTaskWithTopicCreationTest.java     |  69 ++++++++++++--
 .../apache/kafka/connect/util/TopicAdminTest.java  |  31 +++++--
 5 files changed, 192 insertions(+), 24 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 3f7af72..482358d 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -412,10 +412,15 @@ class WorkerSourceTask extends WorkerTask {
         log.debug("Topic '{}' matched topic creation group: {}", topic, topicGroup);
         NewTopic newTopic = topicGroup.newTopic(topic);
 
-        if (admin.createTopic(newTopic)) {
+        TopicAdmin.TopicCreationResponse response = admin.createOrFindTopics(newTopic);
+        if (response.isCreated(newTopic.name())) {
             topicCreation.addTopic(topic);
             log.info("Created topic '{}' using creation group {}", newTopic, topicGroup);
+        } else if (response.isExisting(newTopic.name())) {
+            topicCreation.addTopic(topic);
+            log.info("Found existing topic '{}'", newTopic);
         } else {
+            // The topic still does not exist and could not be created, so treat it as a task failure
             log.warn("Request to create new topic '{}' failed", topic);
             throw new ConnectException("Task failed to create new topic " + newTopic + ". Ensure "
                     + "that the task is authorized to create topics or that the topic exists and "
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
index 615e7a3..428343b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
@@ -60,6 +60,60 @@ import java.util.stream.Collectors;
  */
 public class TopicAdmin implements AutoCloseable {
 
+    public static final TopicCreationResponse EMPTY_CREATION = new TopicCreationResponse(Collections.emptySet(), Collections.emptySet());
+
+    public static class TopicCreationResponse {
+
+        private final Set<String> created;
+        private final Set<String> existing;
+
+        public TopicCreationResponse(Set<String> createdTopicNames, Set<String> existingTopicNames) {
+            this.created = Collections.unmodifiableSet(createdTopicNames);
+            this.existing = Collections.unmodifiableSet(existingTopicNames);
+        }
+
+        public Set<String> createdTopics() {
+            return created;
+        }
+
+        public Set<String> existingTopics() {
+            return existing;
+        }
+
+        public boolean isCreated(String topicName) {
+            return created.contains(topicName);
+        }
+
+        public boolean isExisting(String topicName) {
+            return existing.contains(topicName);
+        }
+
+        public boolean isCreatedOrExisting(String topicName) {
+            return isCreated(topicName) || isExisting(topicName);
+        }
+
+        public int createdTopicsCount() {
+            return created.size();
+        }
+
+        public int existingTopicsCount() {
+            return existing.size();
+        }
+
+        public int createdOrExistingTopicsCount() {
+            return createdTopicsCount() + existingTopicsCount();
+        }
+
+        public boolean isEmpty() {
+            return createdOrExistingTopicsCount() == 0;
+        }
+
+        @Override
+        public String toString() {
+            return "TopicCreationResponse{" + "created=" + created + ", existing=" + existing + '}';
+        }
+    }
+
     public static final int NO_PARTITIONS = -1;
     public static final short NO_REPLICATION_FACTOR = -1;
 
@@ -228,7 +282,7 @@ public class TopicAdmin implements AutoCloseable {
         this.logCreation = logCreation;
     }
 
-   /**
+    /**
      * Attempt to create the topic described by the given definition, returning true if the topic was created or false
      * if the topic already existed.
      *
@@ -260,13 +314,48 @@ public class TopicAdmin implements AutoCloseable {
      *                                     attempting to perform this operation
      */
     public Set<String> createTopics(NewTopic... topics) {
+        return createOrFindTopics(topics).createdTopics();
+    }
+
+    /**
+     * Attempt to find or create the topic described by the given definition, returning true if the topic was created or had
+     * already existed, or false if the topic did not exist and could not be created.
+     *
+     * @param topic the specification of the topic
+     * @return true if the topic was created or existed, or false if the topic could not already existed.
+     * @throws ConnectException            if an error occurs, the operation takes too long, or the thread is interrupted while
+     *                                     attempting to perform this operation
+     * @throws UnsupportedVersionException if the broker does not support the necessary APIs to perform this request
+     */
+    public boolean createOrFindTopic(NewTopic topic) {
+        if (topic == null) return false;
+        return createOrFindTopics(topic).isCreatedOrExisting(topic.name());
+    }
+
+    /**
+     * Attempt to create the topics described by the given definitions, returning all of the names of those topics that
+     * were created by this request. Any existing topics with the same name are unchanged, and the names of such topics
+     * are excluded from the result.
+     * <p>
+     * If multiple topic definitions have the same topic name, the last one with that name will be used.
+     * <p>
+     * Apache Kafka added support for creating topics in 0.10.1.0, so this method works as expected with that and later versions.
+     * With brokers older than 0.10.1.0, this method is unable to create topics and always returns an empty set.
+     *
+     * @param topics the specifications of the topics
+     * @return the {@link TopicCreationResponse} with the names of the newly created and existing topics;
+     *         never null but possibly empty
+     * @throws ConnectException if an error occurs, the operation takes too long, or the thread is interrupted while
+     *                          attempting to perform this operation
+     */
+    public TopicCreationResponse createOrFindTopics(NewTopic... topics) {
         Map<String, NewTopic> topicsByName = new HashMap<>();
         if (topics != null) {
             for (NewTopic topic : topics) {
                 if (topic != null) topicsByName.put(topic.name(), topic);
             }
         }
-        if (topicsByName.isEmpty()) return Collections.emptySet();
+        if (topicsByName.isEmpty()) return EMPTY_CREATION;
         String bootstrapServers = bootstrapServers();
         String topicNameList = Utils.join(topicsByName.keySet(), "', '");
 
@@ -276,6 +365,7 @@ public class TopicAdmin implements AutoCloseable {
 
         // Iterate over each future so that we can handle individual failures like when some topics already exist
         Set<String> newlyCreatedTopicNames = new HashSet<>();
+        Set<String> existingTopicNames = new HashSet<>();
         for (Map.Entry<String, KafkaFuture<Void>> entry : newResults.entrySet()) {
             String topic = entry.getKey();
             try {
@@ -288,25 +378,26 @@ public class TopicAdmin implements AutoCloseable {
                 Throwable cause = e.getCause();
                 if (cause instanceof TopicExistsException) {
                     log.debug("Found existing topic '{}' on the brokers at {}", topic, bootstrapServers);
+                    existingTopicNames.add(topic);
                     continue;
                 }
                 if (cause instanceof UnsupportedVersionException) {
                     log.debug("Unable to create topic(s) '{}' since the brokers at {} do not support the CreateTopics API." +
                             " Falling back to assume topic(s) exist or will be auto-created by the broker.",
                             topicNameList, bootstrapServers);
-                    return Collections.emptySet();
+                    return EMPTY_CREATION;
                 }
                 if (cause instanceof ClusterAuthorizationException) {
                     log.debug("Not authorized to create topic(s) '{}' upon the brokers {}." +
                             " Falling back to assume topic(s) exist or will be auto-created by the broker.",
                             topicNameList, bootstrapServers);
-                    return Collections.emptySet();
+                    return EMPTY_CREATION;
                 }
                 if (cause instanceof TopicAuthorizationException) {
                     log.debug("Not authorized to create topic(s) '{}' upon the brokers {}." +
                                     " Falling back to assume topic(s) exist or will be auto-created by the broker.",
                             topicNameList, bootstrapServers);
-                    return Collections.emptySet();
+                    return EMPTY_CREATION;
                 }
                 if (cause instanceof InvalidConfigurationException) {
                     throw new ConnectException("Unable to create topic(s) '" + topicNameList + "': " + cause.getMessage(),
@@ -324,7 +415,7 @@ public class TopicAdmin implements AutoCloseable {
                 throw new ConnectException("Interrupted while attempting to create/find topic(s) '" + topicNameList + "'", e);
             }
         }
-        return newlyCreatedTopicNames;
+        return new TopicCreationResponse(newlyCreatedTopicNames, existingTopicNames);
     }
 
     /**
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskWithTopicCreationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskWithTopicCreationTest.java
index 9c115ac..692f4d2 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskWithTopicCreationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskWithTopicCreationTest.java
@@ -79,6 +79,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.singletonList;
@@ -529,7 +530,10 @@ public class ErrorHandlingTaskWithTopicCreationTest {
             EasyMock.expect(admin.describeTopics(topic)).andReturn(Collections.emptyMap());
 
             Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
-            EasyMock.expect(admin.createTopic(EasyMock.capture(newTopicCapture))).andReturn(true);
+            Set<String> created = Collections.singleton(topic);
+            Set<String> existing = Collections.emptySet();
+            TopicAdmin.TopicCreationResponse response = new TopicAdmin.TopicCreationResponse(created, existing);
+            EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(response);
         }
     }
 
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskWithTopicCreationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskWithTopicCreationTest.java
index 06862d1..0324168 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskWithTopicCreationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskWithTopicCreationTest.java
@@ -80,6 +80,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -956,7 +957,7 @@ public class WorkerSourceTaskWithTopicCreationTest extends ThreadedTest {
         expectPreliminaryCalls();
         EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
         Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
-        EasyMock.expect(admin.createTopic(EasyMock.capture(newTopicCapture)))
+        EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture)))
                 .andThrow(new RetriableException(new TimeoutException("timeout")));
 
         // Second round
@@ -1034,7 +1035,7 @@ public class WorkerSourceTaskWithTopicCreationTest extends ThreadedTest {
         EasyMock.expect(admin.describeTopics(OTHER_TOPIC)).andReturn(Collections.emptyMap());
         // First call to create the topic times out
         Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
-        EasyMock.expect(admin.createTopic(EasyMock.capture(newTopicCapture)))
+        EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture)))
                 .andThrow(new RetriableException(new TimeoutException("timeout")));
 
         // Second round
@@ -1085,7 +1086,7 @@ public class WorkerSourceTaskWithTopicCreationTest extends ThreadedTest {
         EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
 
         Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
-        EasyMock.expect(admin.createTopic(EasyMock.capture(newTopicCapture)))
+        EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture)))
                 .andThrow(new ConnectException(new TopicAuthorizationException("unauthorized")));
 
         PowerMock.replayAll();
@@ -1096,7 +1097,7 @@ public class WorkerSourceTaskWithTopicCreationTest extends ThreadedTest {
     }
 
     @Test(expected = ConnectException.class)
-    public void testTopicCreateFailsWithExceptionWhenCreateReturnsFalse() throws Exception {
+    public void testTopicCreateFailsWithExceptionWhenCreateReturnsTopicNotCreatedOrFound() throws Exception {
         createWorkerTask();
 
         SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
@@ -1106,7 +1107,7 @@ public class WorkerSourceTaskWithTopicCreationTest extends ThreadedTest {
         EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
 
         Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
-        EasyMock.expect(admin.createTopic(EasyMock.capture(newTopicCapture))).andReturn(false);
+        EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(TopicAdmin.EMPTY_CREATION);
 
         PowerMock.replayAll();
 
@@ -1115,6 +1116,62 @@ public class WorkerSourceTaskWithTopicCreationTest extends ThreadedTest {
         assertNotNull(newTopicCapture.getValue());
     }
 
+    @Test
+    public void testTopicCreateSucceedsWhenCreateReturnsExistingTopicFound() throws Exception {
+        createWorkerTask();
+
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+        expectPreliminaryCalls();
+        EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
+
+        Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
+        EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(foundTopic(TOPIC));
+
+        expectSendRecordTaskCommitRecordSucceed(false, false);
+        expectSendRecordTaskCommitRecordSucceed(false, false);
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2));
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+    }
+
+    @Test
+    public void testTopicCreateSucceedsWhenCreateReturnsNewTopicFound() throws Exception {
+        createWorkerTask();
+
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+        expectPreliminaryCalls();
+        EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
+
+        Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
+        EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(createdTopic(TOPIC));
+
+        expectSendRecordTaskCommitRecordSucceed(false, false);
+        expectSendRecordTaskCommitRecordSucceed(false, false);
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2));
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+    }
+
+    private TopicAdmin.TopicCreationResponse createdTopic(String topic) {
+        Set<String> created = Collections.singleton(topic);
+        Set<String> existing = Collections.emptySet();
+        return new TopicAdmin.TopicCreationResponse(created, existing);
+    }
+
+    private TopicAdmin.TopicCreationResponse foundTopic(String topic) {
+        Set<String> created = Collections.emptySet();
+        Set<String> existing = Collections.singleton(topic);
+        return new TopicAdmin.TopicCreationResponse(created, existing);
+    }
+
     private void expectPreliminaryCalls() {
         expectPreliminaryCalls(TOPIC);
     }
@@ -1431,7 +1488,7 @@ public class WorkerSourceTaskWithTopicCreationTest extends ThreadedTest {
         if (config.topicCreationEnable()) {
             EasyMock.expect(admin.describeTopics(topic)).andReturn(Collections.emptyMap());
             Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
-            EasyMock.expect(admin.createTopic(EasyMock.capture(newTopicCapture))).andReturn(true);
+            EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(createdTopic(topic));
         }
     }
 }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
index b655664..e4d069d 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
@@ -71,15 +71,14 @@ public class TopicAdminTest {
      * create no topics, and return false.
      */
     @Test
-    public void returnNullWithApiVersionMismatchOnCreate() {
+    public void returnEmptyWithApiVersionMismatchOnCreate() {
         final NewTopic newTopic = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
         Cluster cluster = createCluster(1);
         try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
             env.kafkaClient().prepareResponse(createTopicResponseWithUnsupportedVersion(newTopic));
             TopicAdmin admin = new TopicAdmin(null, env.adminClient());
-            boolean created = admin.createTopic(newTopic);
-            assertFalse(created);
+            assertTrue(admin.createOrFindTopics(newTopic).isEmpty());
         }
     }
 
@@ -102,14 +101,16 @@ public class TopicAdminTest {
     }
 
     @Test
-    public void returnNullWithClusterAuthorizationFailureOnCreate() {
+    public void returnEmptyWithClusterAuthorizationFailureOnCreate() {
         final NewTopic newTopic = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
         Cluster cluster = createCluster(1);
         try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) {
             env.kafkaClient().prepareResponse(createTopicResponseWithClusterAuthorizationException(newTopic));
             TopicAdmin admin = new TopicAdmin(null, env.adminClient());
-            boolean created = admin.createTopic(newTopic);
-            assertFalse(created);
+            assertFalse(admin.createTopic(newTopic));
+
+            env.kafkaClient().prepareResponse(createTopicResponseWithClusterAuthorizationException(newTopic));
+            assertTrue(admin.createOrFindTopics(newTopic).isEmpty());
         }
     }
 
@@ -126,14 +127,16 @@ public class TopicAdminTest {
     }
 
     @Test
-    public void returnNullWithTopicAuthorizationFailureOnCreate() {
+    public void returnEmptyWithTopicAuthorizationFailureOnCreate() {
         final NewTopic newTopic = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
         Cluster cluster = createCluster(1);
         try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) {
             env.kafkaClient().prepareResponse(createTopicResponseWithTopicAuthorizationException(newTopic));
             TopicAdmin admin = new TopicAdmin(null, env.adminClient());
-            boolean created = admin.createTopic(newTopic);
-            assertFalse(created);
+            assertFalse(admin.createTopic(newTopic));
+
+            env.kafkaClient().prepareResponse(createTopicResponseWithTopicAuthorizationException(newTopic));
+            assertTrue(admin.createOrFindTopics(newTopic).isEmpty());
         }
     }
 
@@ -158,6 +161,12 @@ public class TopicAdminTest {
             mockAdminClient.addTopic(false, "myTopic", Collections.singletonList(topicPartitionInfo), null);
             TopicAdmin admin = new TopicAdmin(null, mockAdminClient);
             assertFalse(admin.createTopic(newTopic));
+            assertTrue(admin.createTopics(newTopic).isEmpty());
+            assertTrue(admin.createOrFindTopic(newTopic));
+            TopicAdmin.TopicCreationResponse response = admin.createOrFindTopics(newTopic);
+            assertTrue(response.isCreatedOrExisting(newTopic.name()));
+            assertTrue(response.isExisting(newTopic.name()));
+            assertFalse(response.isCreated(newTopic.name()));
         }
     }
 
@@ -506,7 +515,9 @@ public class TopicAdminTest {
         clientBuilder.controller(0);
         try (MockAdminClient admin = clientBuilder.build()) {
             TopicAdmin topicClient = new TopicAdmin(null, admin, false);
-            assertTrue(topicClient.createTopic(newTopic));
+            TopicAdmin.TopicCreationResponse response = topicClient.createOrFindTopics(newTopic);
+            assertTrue(response.isCreated(newTopic.name()));
+            assertFalse(response.isExisting(newTopic.name()));
             assertTopic(admin, newTopic.name(), expectedPartitions, expectedReplicas);
         }
     }