You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ya...@apache.org on 2023/10/12 08:55:50 UTC

[kafka] branch trunk updated: KAFKA-15570: Add unit tests for MemoryConfigBackingStore (#14518)

This is an automated email from the ASF dual-hosted git repository.

yashmayya pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 41c0d44402b KAFKA-15570: Add unit tests for MemoryConfigBackingStore (#14518)
41c0d44402b is described below

commit 41c0d44402bc7d0ff872de9a43f2669da4414d2b
Author: Yash Mayya <ya...@gmail.com>
AuthorDate: Thu Oct 12 09:55:40 2023 +0100

    KAFKA-15570: Add unit tests for MemoryConfigBackingStore (#14518)
    
    Reviewers: Chris Egerton <ch...@aiven.io>, Kalpesh Patel <kp...@confluent.io>
---
 .../storage/MemoryConfigBackingStoreTest.java      | 185 +++++++++++++++++++++
 1 file changed, 185 insertions(+)

diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/MemoryConfigBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/MemoryConfigBackingStoreTest.java
new file mode 100644
index 00000000000..3e449b44cf9
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/MemoryConfigBackingStoreTest.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.connect.runtime.TargetState;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.anySet;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
+public class MemoryConfigBackingStoreTest {
+
+    private static final List<String> CONNECTOR_IDS = Arrays.asList("connector1", "connector2");
+
+    // Actual values are irrelevant here and can be used as either connector or task configurations
+    private static final List<Map<String, String>> SAMPLE_CONFIGS = Arrays.asList(
+        Collections.singletonMap("config-key-one", "config-value-one"),
+        Collections.singletonMap("config-key-two", "config-value-two"),
+        Collections.singletonMap("config-key-three", "config-value-three")
+    );
+
+    @Mock
+    private ConfigBackingStore.UpdateListener configUpdateListener;
+    private final MemoryConfigBackingStore configStore = new MemoryConfigBackingStore();
+
+    @Before
+    public void setUp() {
+        configStore.setUpdateListener(configUpdateListener);
+    }
+
+    @Test
+    public void testPutConnectorConfig() {
+        configStore.putConnectorConfig(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0));
+        ClusterConfigState configState = configStore.snapshot();
+
+        assertTrue(configState.contains(CONNECTOR_IDS.get(0)));
+        // Default initial target state of STARTED should be used if no explicit target state is specified
+        assertEquals(TargetState.STARTED, configState.targetState(CONNECTOR_IDS.get(0)));
+        assertEquals(SAMPLE_CONFIGS.get(0), configState.connectorConfig(CONNECTOR_IDS.get(0)));
+        assertEquals(1, configState.connectors().size());
+
+        verify(configUpdateListener).onConnectorConfigUpdate(eq(CONNECTOR_IDS.get(0)));
+    }
+
+    @Test
+    public void testPutConnectorConfigUpdateExisting() {
+        configStore.putConnectorConfig(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0));
+        ClusterConfigState configState = configStore.snapshot();
+
+        assertTrue(configState.contains(CONNECTOR_IDS.get(0)));
+        // Default initial target state of STARTED should be used if no explicit target state is specified
+        assertEquals(TargetState.STARTED, configState.targetState(CONNECTOR_IDS.get(0)));
+        assertEquals(SAMPLE_CONFIGS.get(0), configState.connectorConfig(CONNECTOR_IDS.get(0)));
+        assertEquals(1, configState.connectors().size());
+
+        configStore.putConnectorConfig(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(1));
+        configState = configStore.snapshot();
+        assertEquals(SAMPLE_CONFIGS.get(1), configState.connectorConfig(CONNECTOR_IDS.get(0)));
+
+        verify(configUpdateListener, times(2)).onConnectorConfigUpdate(eq(CONNECTOR_IDS.get(0)));
+    }
+
+    @Test
+    public void testRemoveConnectorConfig() {
+        configStore.putConnectorConfig(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0));
+        configStore.putConnectorConfig(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(1));
+        ClusterConfigState configState = configStore.snapshot();
+
+        Set<String> expectedConnectors = new HashSet<>();
+        expectedConnectors.add(CONNECTOR_IDS.get(0));
+        expectedConnectors.add(CONNECTOR_IDS.get(1));
+        assertEquals(expectedConnectors, configState.connectors());
+
+        configStore.removeConnectorConfig(CONNECTOR_IDS.get(1));
+        configState = configStore.snapshot();
+
+        assertFalse(configState.contains(CONNECTOR_IDS.get(1)));
+        assertEquals(1, configState.connectors().size());
+
+        verify(configUpdateListener).onConnectorConfigUpdate(eq(CONNECTOR_IDS.get(0)));
+        verify(configUpdateListener).onConnectorConfigUpdate(eq(CONNECTOR_IDS.get(1)));
+        verify(configUpdateListener).onConnectorConfigRemove(eq(CONNECTOR_IDS.get(1)));
+    }
+
+    @Test
+    public void testPutTaskConfigs() {
+        // Can't write task configs for non-existent connector
+        assertThrows(IllegalArgumentException.class,
+            () -> configStore.putTaskConfigs(CONNECTOR_IDS.get(0), Collections.singletonList(SAMPLE_CONFIGS.get(1))));
+
+        configStore.putConnectorConfig(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0));
+        configStore.putTaskConfigs(CONNECTOR_IDS.get(0), Collections.singletonList(SAMPLE_CONFIGS.get(1)));
+        ClusterConfigState configState = configStore.snapshot();
+
+        ConnectorTaskId taskId = new ConnectorTaskId(CONNECTOR_IDS.get(0), 0);
+        assertEquals(1, configState.taskCount(CONNECTOR_IDS.get(0)));
+        assertEquals(SAMPLE_CONFIGS.get(1), configState.taskConfig(taskId));
+
+        verify(configUpdateListener).onConnectorConfigUpdate(eq(CONNECTOR_IDS.get(0)));
+        verify(configUpdateListener).onTaskConfigUpdate(eq(Collections.singleton(taskId)));
+    }
+
+    @Test
+    public void testRemoveTaskConfigs() {
+        // Can't remove task configs for non-existent connector
+        assertThrows(IllegalArgumentException.class,
+            () -> configStore.removeTaskConfigs(CONNECTOR_IDS.get(0)));
+
+        // This workaround is required to verify the arguments passed to ConfigBackingStore.UpdateListener::onTaskConfigUpdate because Mockito
+        // records references to the collection instead of a copy and the argument passed to the method the first time in
+        // MemoryConfigBackingStore::putTaskConfigs is cleared in MemoryConfigBackingStore::removeTaskConfigs
+        final List<Collection<ConnectorTaskId>> onTaskConfigUpdateCaptures = new ArrayList<>();
+        doAnswer(invocation -> {
+            onTaskConfigUpdateCaptures.add(new HashSet<>(invocation.getArgument(0)));
+            return null;
+        }).when(configUpdateListener).onTaskConfigUpdate(anySet());
+
+        configStore.putConnectorConfig(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0));
+        configStore.putTaskConfigs(CONNECTOR_IDS.get(0), Collections.singletonList(SAMPLE_CONFIGS.get(1)));
+        configStore.removeTaskConfigs(CONNECTOR_IDS.get(0));
+        ClusterConfigState configState = configStore.snapshot();
+
+        assertEquals(0, configState.taskCount(CONNECTOR_IDS.get(0)));
+        assertEquals(Collections.emptyList(), configState.tasks(CONNECTOR_IDS.get(0)));
+
+        verify(configUpdateListener).onConnectorConfigUpdate(eq(CONNECTOR_IDS.get(0)));
+        verify(configUpdateListener, times(2)).onTaskConfigUpdate(anySet());
+
+        ConnectorTaskId taskId = new ConnectorTaskId(CONNECTOR_IDS.get(0), 0);
+        assertEquals(Arrays.asList(Collections.singleton(taskId), Collections.singleton(taskId)), onTaskConfigUpdateCaptures);
+    }
+
+    @Test
+    public void testPutTargetState() {
+        // Can't write target state for non-existent connector
+        assertThrows(IllegalArgumentException.class, () -> configStore.putTargetState(CONNECTOR_IDS.get(0), TargetState.PAUSED));
+
+        configStore.putConnectorConfig(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0));
+        configStore.putTargetState(CONNECTOR_IDS.get(0), TargetState.PAUSED);
+        // Ensure that ConfigBackingStore.UpdateListener::onConnectorTargetStateChange is called only once if the same state is written twice
+        configStore.putTargetState(CONNECTOR_IDS.get(0), TargetState.PAUSED);
+        ClusterConfigState configState = configStore.snapshot();
+
+        assertEquals(TargetState.PAUSED, configState.targetState(CONNECTOR_IDS.get(0)));
+
+        verify(configUpdateListener).onConnectorConfigUpdate(eq(CONNECTOR_IDS.get(0)));
+        verify(configUpdateListener).onConnectorTargetStateChange(eq(CONNECTOR_IDS.get(0)));
+    }
+}