You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ce...@apache.org on 2023/04/10 15:56:08 UTC
[kafka] branch trunk updated: KAFKA-14420: Use incrementalAlterConfigs API for syncing topic configurations in MirrorMaker 2 (KIP-894) (#13373)
This is an automated email from the ASF dual-hosted git repository.
cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 751a8af1f0f KAFKA-14420: Use incrementalAlterConfigs API for syncing topic configurations in MirrorMaker 2 (KIP-894) (#13373)
751a8af1f0f is described below
commit 751a8af1f0f01b199d4d8e23f0c90edb7b38bd8c
Author: Gantigmaa Selenge <39...@users.noreply.github.com>
AuthorDate: Mon Apr 10 16:55:49 2023 +0100
KAFKA-14420: Use incrementalAlterConfigs API for syncing topic configurations in MirrorMaker 2 (KIP-894) (#13373)
Reviewers: Mickael Maison <mi...@gmail.com>, Chris Egerton <ch...@aiven.io>
---
.../kafka/clients/admin/AdminClientTestUtils.java | 25 +++
.../kafka/connect/mirror/ConfigPropertyFilter.java | 20 +++
.../mirror/DefaultConfigPropertyFilter.java | 23 ++-
.../kafka/connect/mirror/MirrorSourceConfig.java | 28 +++-
.../connect/mirror/MirrorSourceConnector.java | 90 +++++++++-
.../connect/mirror/MirrorSourceConnectorTest.java | 185 ++++++++++++++++++++-
.../MirrorConnectorsIntegrationBaseTest.java | 145 +++++++++++++++-
.../util/clusters/EmbeddedKafkaCluster.java | 22 +++
8 files changed, 516 insertions(+), 22 deletions(-)
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
index d8b9f427d6b..8b2b20cfaae 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.internals.KafkaFutureImpl;
public class AdminClientTestUtils {
@@ -78,6 +79,30 @@ public class AdminClientTestUtils {
return new ListTopicsResult(future);
}
+ /**
+ * Helper to create a AlterConfigsResult instance for a given Throwable.
+ * AlterConfigsResult's constructor is only accessible from within the
+ * admin package.
+ */
+ public static AlterConfigsResult alterConfigsResult(ConfigResource cr, Throwable t) {
+ KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
+ Map<ConfigResource, KafkaFuture<Void>> futures = Collections.singletonMap(cr, future);
+ future.completeExceptionally(t);
+ return new AlterConfigsResult(futures);
+ }
+
+ /**
+ * Helper to create a AlterConfigsResult instance for a given ConfigResource.
+ * AlterConfigsResult's constructor is only accessible from within the
+ * admin package.
+ */
+ public static AlterConfigsResult alterConfigsResult(ConfigResource cr) {
+ KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
+ Map<ConfigResource, KafkaFuture<Void>> futures = Collections.singletonMap(cr, future);
+ future.complete(null);
+ return new AlterConfigsResult(futures);
+ }
+
/**
* Helper to create a CreatePartitionsResult instance for a given Throwable.
* CreatePartitionsResult's constructor is only accessible from within the
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/ConfigPropertyFilter.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/ConfigPropertyFilter.java
index ec6b3b91071..9183ebcf880 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/ConfigPropertyFilter.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/ConfigPropertyFilter.java
@@ -25,8 +25,28 @@ import java.util.Map;
@InterfaceStability.Evolving
public interface ConfigPropertyFilter extends Configurable, AutoCloseable {
+ /**
+ * Specifies whether to replicate the given topic configuration.
+ * Note that if a property has a default value on the source cluster,
+ * {@link #shouldReplicateSourceDefault(String)} will also be called to
+ * determine how that property should be synced.
+ */
boolean shouldReplicateConfigProperty(String prop);
+ /**
+ * Specifies how to replicate the given topic configuration property
+ * that has a default value on the source cluster. Only invoked for properties
+ * that {@link #shouldReplicateConfigProperty(String)} has returned
+ * {@code true} for.
+ *
+ * @return {@code true} if the default value from the source topic should be synced
+ * to the target topic, and {@code false} if the default value for the target topic
+ * should be used instead
+ */
+ default boolean shouldReplicateSourceDefault(String prop) {
+ return false;
+ }
+
default void close() {
//nop
}
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultConfigPropertyFilter.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultConfigPropertyFilter.java
index 0c85f50d000..1ecf0da14b7 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultConfigPropertyFilter.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultConfigPropertyFilter.java
@@ -30,6 +30,10 @@ public class DefaultConfigPropertyFilter implements ConfigPropertyFilter {
public static final String CONFIG_PROPERTIES_EXCLUDE_CONFIG = "config.properties.exclude";
public static final String CONFIG_PROPERTIES_EXCLUDE_ALIAS_CONFIG = "config.properties.blacklist";
+ public static final String USE_DEFAULTS_FROM = "use.defaults.from";
+ private static final String USE_DEFAULTS_FROM_DOC = "Which cluster's defaults (source or target) to use "
+ + "when syncing topic configurations that have default values.";
+ private static final String USE_DEFAULTS_FROM_DEFAULT = "target";
private static final String CONFIG_PROPERTIES_EXCLUDE_DOC = "List of topic configuration properties and/or regexes "
+ "that should not be replicated.";
@@ -40,11 +44,13 @@ public class DefaultConfigPropertyFilter implements ConfigPropertyFilter {
+ "unclean\\.leader\\.election\\.enable, "
+ "min\\.insync\\.replicas";
private Pattern excludePattern = MirrorUtils.compilePatternList(CONFIG_PROPERTIES_EXCLUDE_DEFAULT);
+ private String useDefaultsFrom = USE_DEFAULTS_FROM_DEFAULT;
@Override
public void configure(Map<String, ?> props) {
ConfigPropertyFilterConfig config = new ConfigPropertyFilterConfig(props);
excludePattern = config.excludePattern();
+ useDefaultsFrom = config.useDefaultsFrom();
}
@Override
@@ -60,6 +66,11 @@ public class DefaultConfigPropertyFilter implements ConfigPropertyFilter {
return !excluded(prop);
}
+ @Override
+ public boolean shouldReplicateSourceDefault(String prop) {
+ return useDefaultsFrom.equals("source");
+ }
+
static class ConfigPropertyFilterConfig extends AbstractConfig {
static final ConfigDef DEF = new ConfigDef()
@@ -72,7 +83,13 @@ public class DefaultConfigPropertyFilter implements ConfigPropertyFilter {
Type.LIST,
null,
Importance.HIGH,
- "Deprecated. Use " + CONFIG_PROPERTIES_EXCLUDE_CONFIG + " instead.");
+ "Deprecated. Use " + CONFIG_PROPERTIES_EXCLUDE_CONFIG + " instead.")
+ .define(USE_DEFAULTS_FROM,
+ Type.STRING,
+ USE_DEFAULTS_FROM_DEFAULT,
+ Importance.MEDIUM,
+ USE_DEFAULTS_FROM_DOC);
+
ConfigPropertyFilterConfig(Map<String, ?> props) {
super(DEF, ConfigUtils.translateDeprecatedConfigs(props, new String[][]{
@@ -82,5 +99,9 @@ public class DefaultConfigPropertyFilter implements ConfigPropertyFilter {
Pattern excludePattern() {
return MirrorUtils.compilePatternList(getList(CONFIG_PROPERTIES_EXCLUDE_CONFIG));
}
+
+ String useDefaultsFrom() {
+ return getString(USE_DEFAULTS_FROM);
+ }
}
}
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java
index 3e582b50455..bbdbfed5843 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java
@@ -26,6 +26,8 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
+
public class MirrorSourceConfig extends MirrorConnectorConfig {
protected static final String REFRESH_TOPICS = "refresh.topics";
@@ -73,6 +75,19 @@ public class MirrorSourceConfig extends MirrorConnectorConfig {
public static final String SYNC_TOPIC_CONFIGS_INTERVAL_SECONDS = SYNC_TOPIC_CONFIGS + INTERVAL_SECONDS_SUFFIX;
private static final String SYNC_TOPIC_CONFIGS_INTERVAL_SECONDS_DOC = "Frequency of topic config sync.";
public static final long SYNC_TOPIC_CONFIGS_INTERVAL_SECONDS_DEFAULT = 10 * 60;
+ @Deprecated
+ public static final String USE_INCREMENTAL_ALTER_CONFIGS = "use.incremental.alter.configs";
+ private static final String USE_INCREMENTAL_ALTER_CONFIG_DOC = "Deprecated. Which API to use for syncing topic configs. " +
+ "The valid values are 'requested', 'required' and 'never'. " +
+ "By default, set to 'requested', which means the IncrementalAlterConfigs API is being used for syncing topic configurations " +
+ "and if any request receives an error from an incompatible broker, it will fallback to using the deprecated AlterConfigs API. " +
+ "If explicitly set to 'required', the IncrementalAlterConfigs API is used without the fallback logic and +" +
+ "if it receives an error from an incompatible broker, the connector will fail." +
+ "If explicitly set to 'never', the AlterConfig is always used." +
+ "This setting will be removed and the behaviour of 'required' will be used in Kafka 4.0, therefore users should ensure that target broker is at least 2.3.0";
+ public static final String REQUEST_INCREMENTAL_ALTER_CONFIGS = "requested";
+ public static final String REQUIRE_INCREMENTAL_ALTER_CONFIGS = "required";
+ public static final String NEVER_USE_INCREMENTAL_ALTER_CONFIGS = "never";
public static final String SYNC_TOPIC_ACLS_ENABLED = SYNC_TOPIC_ACLS + ENABLED_SUFFIX;
private static final String SYNC_TOPIC_ACLS_ENABLED_DOC = "Whether to periodically configure remote topic ACLs to match their corresponding upstream topics.";
@@ -173,6 +188,10 @@ public class MirrorSourceConfig extends MirrorConnectorConfig {
}
}
+ String useIncrementalAlterConfigs() {
+ return getString(USE_INCREMENTAL_ALTER_CONFIGS);
+ }
+
Duration syncTopicAclsInterval() {
if (getBoolean(SYNC_TOPIC_ACLS_ENABLED)) {
return Duration.ofSeconds(getLong(SYNC_TOPIC_ACLS_INTERVAL_SECONDS));
@@ -279,6 +298,13 @@ public class MirrorSourceConfig extends MirrorConnectorConfig {
SYNC_TOPIC_CONFIGS_INTERVAL_SECONDS_DEFAULT,
ConfigDef.Importance.LOW,
SYNC_TOPIC_CONFIGS_INTERVAL_SECONDS_DOC)
+ .define(
+ USE_INCREMENTAL_ALTER_CONFIGS,
+ ConfigDef.Type.STRING,
+ REQUEST_INCREMENTAL_ALTER_CONFIGS,
+ in(REQUEST_INCREMENTAL_ALTER_CONFIGS, REQUIRE_INCREMENTAL_ALTER_CONFIGS, NEVER_USE_INCREMENTAL_ALTER_CONFIGS),
+ ConfigDef.Importance.LOW,
+ USE_INCREMENTAL_ALTER_CONFIG_DOC)
.define(
SYNC_TOPIC_ACLS_ENABLED,
ConfigDef.Type.BOOLEAN,
@@ -313,7 +339,7 @@ public class MirrorSourceConfig extends MirrorConnectorConfig {
OFFSET_SYNCS_TOPIC_LOCATION,
ConfigDef.Type.STRING,
OFFSET_SYNCS_TOPIC_LOCATION_DEFAULT,
- ConfigDef.ValidString.in(SOURCE_CLUSTER_ALIAS_DEFAULT, TARGET_CLUSTER_ALIAS_DEFAULT),
+ in(SOURCE_CLUSTER_ALIAS_DEFAULT, TARGET_CLUSTER_ALIAS_DEFAULT),
ConfigDef.Importance.LOW,
OFFSET_SYNCS_TOPIC_LOCATION_DOC)
.define(
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
index 4dd3c740700..b97fc7bc577 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
@@ -16,15 +16,18 @@
*/
package org.apache.kafka.connect.mirror;
+import java.util.HashMap;
import java.util.Locale;
import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.common.errors.SecurityDisabledException;
import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.ExactlyOnceSupport;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.common.config.ConfigDef;
@@ -41,8 +44,10 @@ import org.apache.kafka.common.resource.ResourcePatternFilter;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidPartitionsException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
@@ -97,6 +102,7 @@ public class MirrorSourceConnector extends SourceConnector {
private Admin sourceAdminClient;
private Admin targetAdminClient;
private Admin offsetSyncsAdminClient;
+ private volatile boolean useIncrementalAlterConfigs;
private AtomicBoolean noAclAuthorizer = new AtomicBoolean(false);
public MirrorSourceConnector() {
@@ -118,6 +124,18 @@ public class MirrorSourceConnector extends SourceConnector {
this.configPropertyFilter = configPropertyFilter;
}
+ // visible for testing the deprecated setting "use.incremental.alter.configs"
+ // this constructor should be removed when the deprecated setting is removed in Kafka 4.0
+ MirrorSourceConnector(SourceAndTarget sourceAndTarget, ReplicationPolicy replicationPolicy,
+ MirrorSourceConfig config, ConfigPropertyFilter configPropertyFilter, Admin targetAdmin) {
+ this.sourceAndTarget = sourceAndTarget;
+ this.replicationPolicy = replicationPolicy;
+ this.configPropertyFilter = configPropertyFilter;
+ this.config = config;
+ this.useIncrementalAlterConfigs = !config.useIncrementalAlterConfigs().equals(MirrorSourceConfig.NEVER_USE_INCREMENTAL_ALTER_CONFIGS);
+ this.targetAdminClient = targetAdmin;
+ }
+
// visible for testing
MirrorSourceConnector(Admin sourceAdminClient, Admin targetAdminClient) {
this.sourceAdminClient = sourceAdminClient;
@@ -139,6 +157,7 @@ public class MirrorSourceConnector extends SourceConnector {
replicationFactor = config.replicationFactor();
sourceAdminClient = config.forwardingAdmin(config.sourceAdminConfig("replication-source-admin"));
targetAdminClient = config.forwardingAdmin(config.targetAdminConfig("replication-target-admin"));
+ useIncrementalAlterConfigs = !config.useIncrementalAlterConfigs().equals(MirrorSourceConfig.NEVER_USE_INCREMENTAL_ALTER_CONFIGS);
offsetSyncsAdminClient = config.forwardingAdmin(config.offsetSyncsTopicAdminConfig());
scheduler = new Scheduler(getClass(), config.entityLabel(), config.adminTimeout());
scheduler.execute(this::createOffsetSyncsTopic, "creating upstream offset-syncs topic");
@@ -366,12 +385,18 @@ public class MirrorSourceConnector extends SourceConnector {
updateTopicAcls(filteredBindings);
}
- private void syncTopicConfigs()
+ // visible for testing
+ void syncTopicConfigs()
throws InterruptedException, ExecutionException {
+ boolean incremental = useIncrementalAlterConfigs;
Map<String, Config> sourceConfigs = describeTopicConfigs(topicsBeingReplicated());
Map<String, Config> targetConfigs = sourceConfigs.entrySet().stream()
- .collect(Collectors.toMap(x -> formatRemoteTopic(x.getKey()), x -> targetConfig(x.getValue())));
- updateTopicConfigs(targetConfigs);
+ .collect(Collectors.toMap(x -> formatRemoteTopic(x.getKey()), x -> targetConfig(x.getValue(), incremental)));
+ if (incremental) {
+ incrementalAlterConfigs(targetConfigs);
+ } else {
+ deprecatedAlterConfigs(targetConfigs);
+ }
}
private void createOffsetSyncsTopic() {
@@ -432,7 +457,7 @@ public class MirrorSourceConnector extends SourceConnector {
.map(sourceTopic -> {
String remoteTopic = formatRemoteTopic(sourceTopic);
int partitionCount = sourceTopicToPartitionCounts.get(sourceTopic).intValue();
- Map<String, String> configs = configToMap(targetConfig(sourceTopicToConfig.get(sourceTopic)));
+ Map<String, String> configs = configToMap(targetConfig(sourceTopicToConfig.get(sourceTopic), false));
return new NewTopic(remoteTopic, partitionCount, (short) replicationFactor)
.configs(configs);
})
@@ -501,9 +526,10 @@ public class MirrorSourceConnector extends SourceConnector {
.collect(Collectors.toMap(ConfigEntry::name, ConfigEntry::value));
}
- @SuppressWarnings("deprecation")
+ // visible for testing
// use deprecated alterConfigs API for broker compatibility back to 0.11.0
- private void updateTopicConfigs(Map<String, Config> topicConfigs) {
+ @SuppressWarnings("deprecation")
+ void deprecatedAlterConfigs(Map<String, Config> topicConfigs) {
Map<ConfigResource, Config> configs = topicConfigs.entrySet().stream()
.collect(Collectors.toMap(x ->
new ConfigResource(ConfigResource.Type.TOPIC, x.getKey()), Entry::getValue));
@@ -515,6 +541,46 @@ public class MirrorSourceConnector extends SourceConnector {
}));
}
+ // visible for testing
+ void incrementalAlterConfigs(Map<String, Config> topicConfigs) {
+ Map<ConfigResource, Collection<AlterConfigOp>> configOps = new HashMap<>();
+ for (Map.Entry<String, Config> topicConfig : topicConfigs.entrySet()) {
+ Collection<AlterConfigOp> ops = new ArrayList<>();
+ ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, topicConfig.getKey());
+ for (ConfigEntry config : topicConfig.getValue().entries()) {
+ if (config.isDefault() && !shouldReplicateSourceDefault(config.name())) {
+ ops.add(new AlterConfigOp(config, AlterConfigOp.OpType.DELETE));
+ } else {
+ ops.add(new AlterConfigOp(config, AlterConfigOp.OpType.SET));
+ }
+ }
+ configOps.put(configResource, ops);
+ }
+ log.trace("Syncing configs for {} topics.", configOps.size());
+ AtomicReference<Boolean> encounteredError = new AtomicReference<>(false);
+ targetAdminClient.incrementalAlterConfigs(configOps).values().forEach((k, v) -> v.whenComplete((x, e) -> {
+ if (e != null) {
+ if (config.useIncrementalAlterConfigs().equals(MirrorSourceConfig.REQUEST_INCREMENTAL_ALTER_CONFIGS)
+ && e instanceof UnsupportedVersionException && !encounteredError.get()) {
+ //Fallback logic
+ log.warn("The target cluster {} is not compatible with IncrementalAlterConfigs API. "
+ + "Therefore using deprecated AlterConfigs API for syncing configs for topic {}",
+ sourceAndTarget.target(), k.name(), e);
+ encounteredError.set(true);
+ useIncrementalAlterConfigs = false;
+ } else if (config.useIncrementalAlterConfigs().equals(MirrorSourceConfig.REQUIRE_INCREMENTAL_ALTER_CONFIGS)
+ && e instanceof UnsupportedVersionException && !encounteredError.get()) {
+ log.error("Failed to sync configs for topic {} on cluster {} with IncrementalAlterConfigs API", k.name(), sourceAndTarget.target(), e);
+ encounteredError.set(true);
+ context.raiseError(new ConnectException("use.incremental.alter.configs was set to \"required\", but the target cluster '"
+ + sourceAndTarget.target() + "' is not compatible with IncrementalAlterConfigs API", e));
+ } else {
+ log.warn("Could not alter configuration of topic {}.", k.name(), e);
+ }
+ }
+ }));
+ }
+
private void updateTopicAcls(List<AclBinding> bindings) {
log.trace("Syncing {} topic ACL bindings.", bindings.size());
targetAdminClient.createAcls(bindings).values().forEach((k, v) -> v.whenComplete((x, e) -> {
@@ -539,9 +605,13 @@ public class MirrorSourceConnector extends SourceConnector {
.collect(Collectors.toMap(x -> x.getKey().name(), Entry::getValue));
}
- Config targetConfig(Config sourceConfig) {
+ Config targetConfig(Config sourceConfig, boolean incremental) {
+ // If using incrementalAlterConfigs API, sync the default property with either SET or DELETE action determined by ConfigPropertyFilter::shouldReplicateSourceDefault later.
+ // If not using incrementalAlterConfigs API, sync the default property only if ConfigPropertyFilter::shouldReplicateSourceDefault returns true.
+ // If ConfigPropertyFilter::shouldReplicateConfigProperty returns false, do not sync the property at all.
List<ConfigEntry> entries = sourceConfig.entries().stream()
- .filter(x -> !x.isDefault() && !x.isReadOnly() && !x.isSensitive())
+ .filter(x -> incremental || (x.isDefault() && shouldReplicateSourceDefault(x.name())) || !x.isDefault())
+ .filter(x -> !x.isReadOnly() && !x.isSensitive())
.filter(x -> x.source() != ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG)
.filter(x -> shouldReplicateTopicConfigurationProperty(x.name()))
.collect(Collectors.toList());
@@ -578,6 +648,10 @@ public class MirrorSourceConnector extends SourceConnector {
return configPropertyFilter.shouldReplicateConfigProperty(property);
}
+ boolean shouldReplicateSourceDefault(String property) {
+ return configPropertyFilter.shouldReplicateSourceDefault(property);
+ }
+
// Recurse upstream to detect cycles, i.e. whether this topic is already on the target cluster
boolean isCycle(String topic) {
String source = replicationPolicy.topicSource(topic);
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
index dfe1f524f22..b178fa34b6f 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.mirror;
+import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.DescribeAclsResult;
import org.apache.kafka.common.KafkaFuture;
@@ -24,7 +25,9 @@ import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.ConfigValue;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.errors.SecurityDisabledException;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
@@ -35,9 +38,11 @@ import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.ExactlyOnceSupport;
import org.junit.jupiter.api.Test;
+import static org.apache.kafka.clients.admin.AdminClientTestUtils.alterConfigsResult;
import static org.apache.kafka.clients.consumer.ConsumerConfig.ISOLATION_LEVEL_CONFIG;
import static org.apache.kafka.connect.mirror.MirrorConnectorConfig.CONSUMER_CLIENT_PREFIX;
import static org.apache.kafka.connect.mirror.MirrorConnectorConfig.SOURCE_PREFIX;
@@ -50,12 +55,14 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -74,6 +81,19 @@ import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
public class MirrorSourceConnectorTest {
+ private ConfigPropertyFilter getConfigPropertyFilter() {
+ return new ConfigPropertyFilter() {
+ @Override
+ public boolean shouldReplicateConfigProperty(String prop) {
+ return true;
+ }
+
+ @Override
+ public boolean shouldReplicateSourceDefault(String prop) {
+ return false;
+ }
+ };
+ }
@Test
public void testReplicatesHeartbeatsByDefault() {
@@ -94,7 +114,7 @@ public class MirrorSourceConnectorTest {
@Test
public void testNoCycles() {
MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"),
- new DefaultReplicationPolicy(), x -> true, x -> true);
+ new DefaultReplicationPolicy(), x -> true, getConfigPropertyFilter());
assertFalse(connector.shouldReplicateTopic("target.topic1"), "should not allow cycles");
assertFalse(connector.shouldReplicateTopic("target.source.topic1"), "should not allow cycles");
assertFalse(connector.shouldReplicateTopic("source.target.topic1"), "should not allow cycles");
@@ -107,7 +127,7 @@ public class MirrorSourceConnectorTest {
@Test
public void testIdentityReplication() {
MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"),
- new IdentityReplicationPolicy(), x -> true, x -> true);
+ new IdentityReplicationPolicy(), x -> true, getConfigPropertyFilter());
assertTrue(connector.shouldReplicateTopic("target.topic1"), "should allow cycles");
assertTrue(connector.shouldReplicateTopic("target.source.topic1"), "should allow cycles");
assertTrue(connector.shouldReplicateTopic("source.target.topic1"), "should allow cycles");
@@ -127,7 +147,7 @@ public class MirrorSourceConnectorTest {
@Test
public void testAclFiltering() {
MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"),
- new DefaultReplicationPolicy(), x -> true, x -> true);
+ new DefaultReplicationPolicy(), x -> true, getConfigPropertyFilter());
assertFalse(connector.shouldReplicateAcl(
new AclBinding(new ResourcePattern(ResourceType.TOPIC, "test_topic", PatternType.LITERAL),
new AccessControlEntry("kafka", "", AclOperation.WRITE, AclPermissionType.ALLOW))), "should not replicate ALLOW WRITE");
@@ -139,7 +159,7 @@ public class MirrorSourceConnectorTest {
@Test
public void testAclTransformation() {
MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"),
- new DefaultReplicationPolicy(), x -> true, x -> true);
+ new DefaultReplicationPolicy(), x -> true, getConfigPropertyFilter());
AclBinding allowAllAclBinding = new AclBinding(
new ResourcePattern(ResourceType.TOPIC, "test_topic", PatternType.LITERAL),
new AccessControlEntry("kafka", "", AclOperation.ALL, AclPermissionType.ALLOW));
@@ -209,15 +229,62 @@ public class MirrorSourceConnectorTest {
new DefaultReplicationPolicy(), x -> true, new DefaultConfigPropertyFilter());
ArrayList<ConfigEntry> entries = new ArrayList<>();
entries.add(new ConfigEntry("name-1", "value-1"));
+ entries.add(new ConfigEntry("name-2", "value-2", ConfigEntry.ConfigSource.DEFAULT_CONFIG, false, false, Collections.emptyList(), ConfigEntry.ConfigType.STRING, ""));
+ entries.add(new ConfigEntry("min.insync.replicas", "2"));
+ Config config = new Config(entries);
+ Config targetConfig = connector.targetConfig(config, true);
+ assertTrue(targetConfig.entries().stream()
+ .anyMatch(x -> x.name().equals("name-1")), "should replicate properties");
+ assertTrue(targetConfig.entries().stream()
+ .anyMatch(x -> x.name().equals("name-2")), "should include default properties");
+ assertFalse(targetConfig.entries().stream()
+ .anyMatch(x -> x.name().equals("min.insync.replicas")), "should not replicate excluded properties");
+ }
+
+ @Test
+ @Deprecated
+ public void testConfigPropertyFilteringWithAlterConfigs() {
+ MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"),
+ new DefaultReplicationPolicy(), x -> true, new DefaultConfigPropertyFilter());
+ List<ConfigEntry> entries = new ArrayList<>();
+ entries.add(new ConfigEntry("name-1", "value-1"));
+ // When "use.defaults.from" set to "target" by default, the config with default value should be excluded
+ entries.add(new ConfigEntry("name-2", "value-2", ConfigEntry.ConfigSource.DEFAULT_CONFIG, false, false, Collections.emptyList(), ConfigEntry.ConfigType.STRING, ""));
entries.add(new ConfigEntry("min.insync.replicas", "2"));
Config config = new Config(entries);
- Config targetConfig = connector.targetConfig(config);
+ Config targetConfig = connector.targetConfig(config, false);
assertTrue(targetConfig.entries().stream()
.anyMatch(x -> x.name().equals("name-1")), "should replicate properties");
+ assertFalse(targetConfig.entries().stream()
+ .anyMatch(x -> x.name().equals("name-2")), "should not replicate default properties");
assertFalse(targetConfig.entries().stream()
.anyMatch(x -> x.name().equals("min.insync.replicas")), "should not replicate excluded properties");
}
+ @Test
+ @Deprecated
+ public void testConfigPropertyFilteringWithAlterConfigsAndSourceDefault() {
+ Map<String, Object> filterConfig = Collections.singletonMap(DefaultConfigPropertyFilter.USE_DEFAULTS_FROM, "source");
+ DefaultConfigPropertyFilter filter = new DefaultConfigPropertyFilter();
+ filter.configure(filterConfig);
+
+ MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"),
+ new DefaultReplicationPolicy(), x -> true, filter);
+ List<ConfigEntry> entries = new ArrayList<>();
+ entries.add(new ConfigEntry("name-1", "value-1"));
+ // When "use.defaults.from" explicitly set to "source", the config with default value should be replicated
+ entries.add(new ConfigEntry("name-2", "value-2", ConfigEntry.ConfigSource.DEFAULT_CONFIG, false, false, Collections.emptyList(), ConfigEntry.ConfigType.STRING, ""));
+ entries.add(new ConfigEntry("min.insync.replicas", "2"));
+ Config config = new Config(entries);
+ Config targetConfig = connector.targetConfig(config, false);
+ assertTrue(targetConfig.entries().stream()
+ .anyMatch(x -> x.name().equals("name-1")), "should replicate properties");
+ assertTrue(targetConfig.entries().stream()
+ .anyMatch(x -> x.name().equals("name-2")), "should include default properties");
+ assertFalse(targetConfig.entries().stream()
+ .anyMatch(x -> x.name().equals("min.insync.replicas")), "should not replicate excluded properties");
+ }
+
@Test
public void testNewTopicConfigs() throws Exception {
Map<String, Object> filterConfig = new HashMap<>();
@@ -261,6 +328,114 @@ public class MirrorSourceConnectorTest {
verify(connector).createNewTopics(any(), any());
}
+ @Test
+ @Deprecated
+ public void testIncrementalAlterConfigsRequested() throws Exception {
+ Map<String, String> props = makeProps();
+ props.put(MirrorSourceConfig.USE_INCREMENTAL_ALTER_CONFIGS, MirrorSourceConfig.REQUEST_INCREMENTAL_ALTER_CONFIGS);
+ MirrorSourceConfig connectorConfig = new MirrorSourceConfig(props);
+
+ Admin admin = mock(Admin.class);
+ MirrorSourceConnector connector = spy(new MirrorSourceConnector(new SourceAndTarget("source", "target"),
+ new DefaultReplicationPolicy(), connectorConfig, new DefaultConfigPropertyFilter(), admin));
+ final String topic = "testtopic";
+ List<ConfigEntry> entries = Collections.singletonList(new ConfigEntry("name-1", "value-1"));
+ Config config = new Config(entries);
+ doReturn(Collections.singletonMap(topic, config)).when(connector).describeTopicConfigs(any());
+ doReturn(alterConfigsResult(new ConfigResource(ConfigResource.Type.TOPIC, topic), new UnsupportedVersionException("Unsupported API"))).when(admin).incrementalAlterConfigs(any());
+ doNothing().when(connector).deprecatedAlterConfigs(any());
+ connector.syncTopicConfigs();
+ Map<String, Config> topicConfigs = Collections.singletonMap("source." + topic, config);
+ verify(connector).incrementalAlterConfigs(topicConfigs);
+
+ // the next time we sync topic configurations, expect to use the deprecated API
+ connector.syncTopicConfigs();
+ verify(connector, times(1)).deprecatedAlterConfigs(topicConfigs);
+ }
+
+ @Test
+ @Deprecated
+ public void testIncrementalAlterConfigsRequired() throws Exception {
+ Map<String, String> props = makeProps();
+ props.put(MirrorSourceConfig.USE_INCREMENTAL_ALTER_CONFIGS, MirrorSourceConfig.REQUIRE_INCREMENTAL_ALTER_CONFIGS);
+ MirrorSourceConfig connectorConfig = new MirrorSourceConfig(props);
+
+ Admin admin = mock(Admin.class);
+ MirrorSourceConnector connector = spy(new MirrorSourceConnector(new SourceAndTarget("source", "target"),
+ new DefaultReplicationPolicy(), connectorConfig, new DefaultConfigPropertyFilter(), admin));
+ final String topic = "testtopic";
+ List<ConfigEntry> entries = new ArrayList<>();
+ ConfigEntry entryWithNonDefaultValue = new ConfigEntry("name-1", "value-1");
+ ConfigEntry entryWithDefaultValue = new ConfigEntry("name-2", "value-2", ConfigEntry.ConfigSource.DEFAULT_CONFIG, false, false,
+ Collections.emptyList(), ConfigEntry.ConfigType.STRING, "");
+ entries.add(entryWithNonDefaultValue);
+ entries.add(entryWithDefaultValue);
+ Config config = new Config(entries);
+ doReturn(Collections.singletonMap(topic, config)).when(connector).describeTopicConfigs(any());
+
+ doAnswer(invocation -> {
+ Map<ConfigResource, Collection<AlterConfigOp>> configOps = invocation.getArgument(0);
+ assertNotNull(configOps);
+ assertEquals(1, configOps.size());
+
+ ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "source." + topic);
+ Collection<AlterConfigOp> ops = new ArrayList<>();
+ ops.add(new AlterConfigOp(entryWithNonDefaultValue, AlterConfigOp.OpType.SET));
+ ops.add(new AlterConfigOp(entryWithDefaultValue, AlterConfigOp.OpType.DELETE));
+
+ assertEquals(ops, configOps.get(configResource));
+
+ return alterConfigsResult(configResource);
+ }).when(admin).incrementalAlterConfigs(any());
+
+ connector.syncTopicConfigs();
+ Map<String, Config> topicConfigs = Collections.singletonMap("source." + topic, config);
+ verify(connector).incrementalAlterConfigs(topicConfigs);
+ }
+
+ @Test
+ @Deprecated
+ public void testIncrementalAlterConfigsRequiredButUnsupported() throws Exception {
+ Map<String, String> props = makeProps();
+ props.put(MirrorSourceConfig.USE_INCREMENTAL_ALTER_CONFIGS, MirrorSourceConfig.REQUIRE_INCREMENTAL_ALTER_CONFIGS);
+ MirrorSourceConfig connectorConfig = new MirrorSourceConfig(props);
+
+ Admin admin = mock(Admin.class);
+ ConnectorContext connectorContext = mock(ConnectorContext.class);
+ MirrorSourceConnector connector = spy(new MirrorSourceConnector(new SourceAndTarget("source", "target"),
+ new DefaultReplicationPolicy(), connectorConfig, new DefaultConfigPropertyFilter(), admin));
+ connector.initialize(connectorContext);
+ final String topic = "testtopic";
+ List<ConfigEntry> entries = Collections.singletonList(new ConfigEntry("name-1", "value-1"));
+ Config config = new Config(entries);
+ doReturn(Collections.singletonMap(topic, config)).when(connector).describeTopicConfigs(any());
+ doReturn(alterConfigsResult(new ConfigResource(ConfigResource.Type.TOPIC, topic), new UnsupportedVersionException("Unsupported API"))).when(admin).incrementalAlterConfigs(any());
+
+ connector.syncTopicConfigs();
+ verify(connectorContext).raiseError(isA(ConnectException.class));
+ }
+
+
+ @Test
+ @Deprecated
+ public void testIncrementalAlterConfigsNeverUsed() throws Exception {
+ Map<String, String> props = makeProps();
+ props.put(MirrorSourceConfig.USE_INCREMENTAL_ALTER_CONFIGS, MirrorSourceConfig.NEVER_USE_INCREMENTAL_ALTER_CONFIGS);
+ MirrorSourceConfig connectorConfigs = new MirrorSourceConfig(props);
+
+ MirrorSourceConnector connector = spy(new MirrorSourceConnector(new SourceAndTarget("source", "target"),
+ new DefaultReplicationPolicy(), connectorConfigs, new DefaultConfigPropertyFilter(), null));
+ final String topic = "testtopic";
+ List<ConfigEntry> entries = Collections.singletonList(new ConfigEntry("name-1", "value-1"));
+ Config config = new Config(entries);
+ doReturn(Collections.singletonMap(topic, config)).when(connector).describeTopicConfigs(any());
+ doNothing().when(connector).deprecatedAlterConfigs(any());
+ connector.syncTopicConfigs();
+ Map<String, Config> topicConfigs = Collections.singletonMap("source." + topic, config);
+ verify(connector).deprecatedAlterConfigs(topicConfigs);
+ verify(connector, never()).incrementalAlterConfigs(any());
+ }
+
@Test
public void testMirrorSourceConnectorTaskConfig() {
List<TopicPartition> knownSourceTopicPartitions = new ArrayList<>();
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
index 27579d3275f..802b02bfe32 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
@@ -18,7 +18,9 @@ package org.apache.kafka.connect.mirror.integration;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.Config;
+import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.OffsetSpec;
@@ -171,6 +173,8 @@ public class MirrorConnectorsIntegrationBaseTest {
// exclude topic config:
mm2Props.put(DefaultConfigPropertyFilter.CONFIG_PROPERTIES_EXCLUDE_CONFIG, "delete\\.retention\\..*");
+ // set it to shorter sync interval for testing
+ mm2Props.put("sync.topic.configs.interval.seconds", "1");
mm2Config = new MirrorMakerConfig(mm2Props);
primaryWorkerProps = mm2Config.workerConfig(new SourceAndTarget(BACKUP_CLUSTER_ALIAS, PRIMARY_CLUSTER_ALIAS));
@@ -263,7 +267,7 @@ public class MirrorConnectorsIntegrationBaseTest {
String reverseTopic1 = remoteTopicName("test-topic-1", BACKUP_CLUSTER_ALIAS);
String consumerGroupName = "consumer-group-testReplication";
Map<String, Object> consumerProps = Collections.singletonMap("group.id", consumerGroupName);
- // warm up consumers before starting the connectors so we don't need to wait for discovery
+ // warm up consumers before starting the connectors, so we don't need to wait for discovery
warmUpConsumer(consumerProps);
mm2Config = new MirrorMakerConfig(mm2Props);
@@ -355,7 +359,6 @@ public class MirrorConnectorsIntegrationBaseTest {
assertTrue(primaryConsumer.position(
new TopicPartition(reverseTopic1, 0)) <= NUM_RECORDS_PRODUCED, "Consumer failedback beyond expected downstream offset.");
}
-
}
// create more matching topics
@@ -399,7 +402,7 @@ public class MirrorConnectorsIntegrationBaseTest {
// produce to all test-topic-empty's partitions, except the last partition
produceMessages(primary, topic, NUM_PARTITIONS - 1);
- // consume before starting the connectors so we don't need to wait for discovery
+ // consume before starting the connectors, so we don't need to wait for discovery
int expectedRecords = NUM_RECORDS_PER_PARTITION * (NUM_PARTITIONS - 1);
try (Consumer<byte[], byte[]> primaryConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, topic)) {
waitForConsumingAllRecords(primaryConsumer, expectedRecords);
@@ -452,7 +455,7 @@ public class MirrorConnectorsIntegrationBaseTest {
put("group.id", consumerGroupName);
put("auto.offset.reset", "earliest");
}};
- // create consumers before starting the connectors so we don't need to wait for discovery
+ // create consumers before starting the connectors, so we don't need to wait for discovery
try (Consumer<byte[], byte[]> primaryConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps,
"test-topic-1")) {
// we need to wait for consuming all the records for MM2 replicating the expected offsets
@@ -483,7 +486,6 @@ public class MirrorConnectorsIntegrationBaseTest {
waitForConsumerGroupFullSync(backup, Collections.singletonList(backupTopic1),
consumerGroupName, NUM_RECORDS_PRODUCED, offsetLagMax);
-
assertDownstreamRedeliveriesBoundedByMaxLag(backupConsumer, offsetLagMax);
}
@@ -503,7 +505,7 @@ public class MirrorConnectorsIntegrationBaseTest {
waitForConsumingAllRecords(consumer1, NUM_RECORDS_PRODUCED);
}
- // create a consumer at backup cluster with same consumer group Id to consume old and new topic
+ // create a consumer at backup cluster with same consumer group ID to consume old and new topic
try (Consumer<byte[], byte[]> backupConsumer = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
"group.id", consumerGroupName), backupTopic1, remoteTopic2)) {
@@ -655,6 +657,135 @@ public class MirrorConnectorsIntegrationBaseTest {
assertMonotonicCheckpoints(backup, "primary.checkpoints.internal");
}
+ @Test
+ public void testSyncTopicConfigs() throws InterruptedException {
+ mm2Config = new MirrorMakerConfig(mm2Props);
+
+ waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
+
+ // create topic with configuration to test:
+ final Map<String, String> topicConfig = new HashMap<>();
+ topicConfig.put("delete.retention.ms", "1000"); // should be excluded (default value is 86400000)
+ topicConfig.put("retention.bytes", "1000"); // should be included, default value is -1
+
+ final String topic = "test-topic-with-config";
+ final String backupTopic = remoteTopicName(topic, PRIMARY_CLUSTER_ALIAS);
+
+ primary.kafka().createTopic(topic, NUM_PARTITIONS, 1, topicConfig);
+ waitForTopicCreated(backup, backupTopic);
+
+ // alter configs on the target topic
+ ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, backupTopic);
+ Collection<AlterConfigOp> ops = new ArrayList<>();
+ ops.add(new AlterConfigOp(new ConfigEntry("delete.retention.ms", "2000"), AlterConfigOp.OpType.SET));
+ ops.add(new AlterConfigOp(new ConfigEntry("retention.bytes", "2000"), AlterConfigOp.OpType.SET));
+ Map<ConfigResource, Collection<AlterConfigOp>> configOps = Collections.singletonMap(configResource, ops);
+ // alter configs on target cluster
+ backup.kafka().incrementalAlterConfigs(configOps);
+
+ waitForCondition(() -> {
+ String primaryConfig, backupConfig;
+ primaryConfig = getTopicConfig(primary.kafka(), topic, "delete.retention.ms");
+ backupConfig = getTopicConfig(backup.kafka(), backupTopic, "delete.retention.ms");
+ assertNotEquals(primaryConfig, backupConfig,
+ "`delete.retention.ms` should be different, because it's in exclude filter! ");
+ assertEquals("2000", backupConfig, "`delete.retention.ms` should be 2000, because it's explicitly defined on the target topic! ");
+
+ // regression test for the config that are still supposed to be replicated
+ primaryConfig = getTopicConfig(primary.kafka(), topic, "retention.bytes");
+ backupConfig = getTopicConfig(backup.kafka(), backupTopic, "retention.bytes");
+ assertEquals(primaryConfig, backupConfig,
+ "`retention.bytes` should be the same, because it isn't in exclude filter! ");
+ return true;
+ }, 30000, "Topic configurations were not synced");
+ }
+
+ @Test
+ public void testReplicateSourceDefault() throws Exception {
+ mm2Props.put(DefaultConfigPropertyFilter.USE_DEFAULTS_FROM, "source");
+ mm2Config = new MirrorMakerConfig(mm2Props);
+
+ waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
+
+ // create topic with default configurations to test
+ final String topic = "test-topic-with-config";
+ final String backupTopic = remoteTopicName(topic, PRIMARY_CLUSTER_ALIAS);
+
+ primary.kafka().createTopic(topic, NUM_PARTITIONS, 1, new HashMap<>());
+ waitForTopicCreated(backup, backupTopic);
+
+ // alter target topic configurations
+ ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, backupTopic);
+ Collection<AlterConfigOp> ops = new ArrayList<>();
+ ops.add(new AlterConfigOp(new ConfigEntry("delete.retention.ms", "2000"), AlterConfigOp.OpType.SET));
+ ops.add(new AlterConfigOp(new ConfigEntry("retention.bytes", "2000"), AlterConfigOp.OpType.SET));
+ Map<ConfigResource, Collection<AlterConfigOp>> configOps = Collections.singletonMap(configResource, ops);
+ backup.kafka().incrementalAlterConfigs(configOps);
+
+ waitForCondition(() -> {
+ String primaryConfig, backupConfig;
+ // altered configuration of the target topic should be synced with the source cluster's default
+ primaryConfig = getTopicConfig(primary.kafka(), topic, "retention.bytes");
+ backupConfig = getTopicConfig(backup.kafka(), backupTopic, "retention.bytes");
+ assertEquals(primaryConfig, backupConfig,
+ "`retention.bytes` should be the same, because the source cluster default is being used! ");
+ assertEquals("-1", backupConfig,
+ "`retention.bytes` should be synced with default value!");
+
+ // when using the source cluster's default, the excluded configuration should still not be changed
+ primaryConfig = getTopicConfig(primary.kafka(), topic, "delete.retention.ms");
+ backupConfig = getTopicConfig(backup.kafka(), backupTopic, "delete.retention.ms");
+ assertNotEquals(primaryConfig, backupConfig,
+ "`delete.retention.ms` should be different, because it's in exclude filter! ");
+ assertEquals("2000", backupConfig, "`delete.retention.ms` should be 2000, because it's explicitly defined on the target topic! ");
+ return true;
+ }, 30000, "Topic configurations were not synced");
+ }
+
+ @Test
+ public void testReplicateTargetDefault() throws Exception {
+ mm2Config = new MirrorMakerConfig(mm2Props);
+
+ waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
+
+ // create topic with configuration to test:
+ final Map<String, String> topicConfig = new HashMap<>();
+ topicConfig.put("retention.bytes", "1000");
+
+ final String topic = "test-topic-with-config";
+ final String backupTopic = remoteTopicName(topic, PRIMARY_CLUSTER_ALIAS);
+
+ primary.kafka().createTopic(topic, NUM_PARTITIONS, 1, topicConfig);
+ waitForTopicCreated(backup, backupTopic);
+
+ waitForCondition(() -> {
+ String primaryConfig, backupConfig;
+ primaryConfig = getTopicConfig(primary.kafka(), topic, "retention.bytes");
+ backupConfig = getTopicConfig(backup.kafka(), backupTopic, "retention.bytes");
+ assertEquals(primaryConfig, backupConfig,
+ "`retention.bytes` should be the same");
+ assertEquals("1000", backupConfig,
+ "`retention.bytes` should be synced with default value!");
+ return true;
+ }, 30000, "Topic configurations were not synced");
+
+ // delete the previously altered configuration of the source topic
+ ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
+ Collection<AlterConfigOp> ops = new ArrayList<>();
+ ops.add(new AlterConfigOp(new ConfigEntry("retention.bytes", "1000"), AlterConfigOp.OpType.DELETE));
+ Map<ConfigResource, Collection<AlterConfigOp>> configOps = Collections.singletonMap(configResource, ops);
+ primary.kafka().incrementalAlterConfigs(configOps);
+
+ waitForCondition(() -> {
+ String backupConfig;
+ // the configuration on the target topic should be changed to the target cluster's default
+ backupConfig = getTopicConfig(backup.kafka(), backupTopic, "retention.bytes");
+ assertEquals("-1", backupConfig,
+ "`retention.bytes` should be synced with target's default value!");
+ return true;
+ }, 30000, "Topic configurations were not synced");
+ }
+
private TopicPartition remoteTopicPartition(TopicPartition tp, String alias) {
return new TopicPartition(remoteTopicName(tp.topic(), alias), tp.partition());
@@ -959,7 +1090,7 @@ public class MirrorConnectorsIntegrationBaseTest {
Map<String, Object> adminClientConfig = Collections.singletonMap(
AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, REQUEST_TIMEOUT_DURATION_MS);
- // create these topics before starting the connectors so we don't need to wait for discovery
+ // create these topics before starting the connectors, so we don't need to wait for discovery
primary.kafka().createTopic("test-topic-no-checkpoints", 1, 1, emptyMap, adminClientConfig);
primary.kafka().createTopic("test-topic-1", NUM_PARTITIONS, 1, topicConfig, adminClientConfig);
backup.kafka().createTopic("test-topic-1", NUM_PARTITIONS, 1, emptyMap, adminClientConfig);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
index 9b2951ef19e..f5c43c718a7 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
@@ -25,11 +25,13 @@ import kafka.zk.EmbeddedZookeeper;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.AlterConfigsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.ListOffsetsOptions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -41,6 +43,7 @@ import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.errors.InvalidReplicationFactorException;
@@ -353,6 +356,25 @@ public class EmbeddedKafkaCluster {
return results;
}
+ /**
+ * Update the configuration for the specified resources with the default options.
+ *
+ * @param configs The resources with their configs (topic is the only resource type with configs that can
+ * be updated currently)
+ * @return The AlterConfigsResult
+ */
+ public AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource, Collection<AlterConfigOp>> configs) {
+ AlterConfigsResult result;
+ log.info("Altering configs for topics {}", configs.keySet());
+ try (Admin admin = createAdminClient()) {
+ result = admin.incrementalAlterConfigs(configs);
+ log.info("Altered configurations {}", result.all().get());
+ } catch (Exception e) {
+ throw new AssertionError("Could not alter topic configurations " + configs.keySet(), e);
+ }
+ return result;
+ }
+
/**
* Create a Kafka topic with 1 partition and a replication factor of 1.
*