You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ce...@apache.org on 2023/04/10 15:56:08 UTC

[kafka] branch trunk updated: KAFKA-14420: Use incrementalAlterConfigs API for syncing topic configurations in MirrorMaker 2 (KIP-894) (#13373)

This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 751a8af1f0f KAFKA-14420: Use incrementalAlterConfigs API for syncing topic configurations in MirrorMaker 2 (KIP-894) (#13373)
751a8af1f0f is described below

commit 751a8af1f0f01b199d4d8e23f0c90edb7b38bd8c
Author: Gantigmaa Selenge <39...@users.noreply.github.com>
AuthorDate: Mon Apr 10 16:55:49 2023 +0100

    KAFKA-14420: Use incrementalAlterConfigs API for syncing topic configurations in MirrorMaker 2 (KIP-894) (#13373)
    
    Reviewers: Mickael Maison <mi...@gmail.com>, Chris Egerton <ch...@aiven.io>
---
 .../kafka/clients/admin/AdminClientTestUtils.java  |  25 +++
 .../kafka/connect/mirror/ConfigPropertyFilter.java |  20 +++
 .../mirror/DefaultConfigPropertyFilter.java        |  23 ++-
 .../kafka/connect/mirror/MirrorSourceConfig.java   |  28 +++-
 .../connect/mirror/MirrorSourceConnector.java      |  90 +++++++++-
 .../connect/mirror/MirrorSourceConnectorTest.java  | 185 ++++++++++++++++++++-
 .../MirrorConnectorsIntegrationBaseTest.java       | 145 +++++++++++++++-
 .../util/clusters/EmbeddedKafkaCluster.java        |  22 +++
 8 files changed, 516 insertions(+), 22 deletions(-)

diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
index d8b9f427d6b..8b2b20cfaae 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
 
 public class AdminClientTestUtils {
@@ -78,6 +79,30 @@ public class AdminClientTestUtils {
         return new ListTopicsResult(future);
     }
 
+    /**
+     * Helper to create a AlterConfigsResult instance for a given Throwable.
+     * AlterConfigsResult's constructor is only accessible from within the
+     * admin package.
+     */
+    public static AlterConfigsResult alterConfigsResult(ConfigResource cr, Throwable t) {
+        KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
+        Map<ConfigResource, KafkaFuture<Void>> futures = Collections.singletonMap(cr, future);
+        future.completeExceptionally(t);
+        return new AlterConfigsResult(futures);
+    }
+
+    /**
+     * Helper to create a AlterConfigsResult instance for a given ConfigResource.
+     * AlterConfigsResult's constructor is only accessible from within the
+     * admin package.
+     */
+    public static AlterConfigsResult alterConfigsResult(ConfigResource cr) {
+        KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
+        Map<ConfigResource, KafkaFuture<Void>> futures = Collections.singletonMap(cr, future);
+        future.complete(null);
+        return new AlterConfigsResult(futures);
+    }
+
     /**
      * Helper to create a CreatePartitionsResult instance for a given Throwable.
      * CreatePartitionsResult's constructor is only accessible from within the
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/ConfigPropertyFilter.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/ConfigPropertyFilter.java
index ec6b3b91071..9183ebcf880 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/ConfigPropertyFilter.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/ConfigPropertyFilter.java
@@ -25,8 +25,28 @@ import java.util.Map;
 @InterfaceStability.Evolving
 public interface ConfigPropertyFilter extends Configurable, AutoCloseable {
 
+    /**
+     * Specifies whether to replicate the given topic configuration.
+     * Note that if a property has a default value on the source cluster,
+     * {@link #shouldReplicateSourceDefault(String)} will also be called to
+     * determine how that property should be synced.
+     */
     boolean shouldReplicateConfigProperty(String prop);
 
+    /**
+     * Specifies how to replicate the given topic configuration property
+     * that has a default value on the source cluster. Only invoked for properties
+     * that {@link #shouldReplicateConfigProperty(String)} has returned
+     * {@code true} for.
+     *
+     * @return {@code true} if the default value from the source topic should be synced
+     * to the target topic, and {@code false} if the default value for the target topic
+     * should be used instead
+     */
+    default boolean shouldReplicateSourceDefault(String prop) {
+        return false;
+    }
+
     default void close() {
         //nop
     }
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultConfigPropertyFilter.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultConfigPropertyFilter.java
index 0c85f50d000..1ecf0da14b7 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultConfigPropertyFilter.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultConfigPropertyFilter.java
@@ -30,6 +30,10 @@ public class DefaultConfigPropertyFilter implements ConfigPropertyFilter {
     
     public static final String CONFIG_PROPERTIES_EXCLUDE_CONFIG = "config.properties.exclude";
     public static final String CONFIG_PROPERTIES_EXCLUDE_ALIAS_CONFIG = "config.properties.blacklist";
+    public static final String USE_DEFAULTS_FROM = "use.defaults.from";
+    private static final String USE_DEFAULTS_FROM_DOC = "Which cluster's defaults (source or target) to use "
+                                                        + "when syncing topic configurations that have default values.";
+    private static final String USE_DEFAULTS_FROM_DEFAULT = "target";
 
     private static final String CONFIG_PROPERTIES_EXCLUDE_DOC = "List of topic configuration properties and/or regexes "
                                                                 + "that should not be replicated.";
@@ -40,11 +44,13 @@ public class DefaultConfigPropertyFilter implements ConfigPropertyFilter {
                                                                    + "unclean\\.leader\\.election\\.enable, "
                                                                    + "min\\.insync\\.replicas";
     private Pattern excludePattern = MirrorUtils.compilePatternList(CONFIG_PROPERTIES_EXCLUDE_DEFAULT);
+    private String useDefaultsFrom = USE_DEFAULTS_FROM_DEFAULT;
 
     @Override
     public void configure(Map<String, ?> props) {
         ConfigPropertyFilterConfig config = new ConfigPropertyFilterConfig(props);
         excludePattern = config.excludePattern();
+        useDefaultsFrom = config.useDefaultsFrom();
     }
 
     @Override
@@ -60,6 +66,11 @@ public class DefaultConfigPropertyFilter implements ConfigPropertyFilter {
         return !excluded(prop);
     }
 
+    @Override
+    public boolean shouldReplicateSourceDefault(String prop) {
+        return useDefaultsFrom.equals("source");
+    }
+
     static class ConfigPropertyFilterConfig extends AbstractConfig {
 
         static final ConfigDef DEF = new ConfigDef()
@@ -72,7 +83,13 @@ public class DefaultConfigPropertyFilter implements ConfigPropertyFilter {
                     Type.LIST,
                     null,
                     Importance.HIGH,
-                    "Deprecated. Use " + CONFIG_PROPERTIES_EXCLUDE_CONFIG + " instead.");
+                    "Deprecated. Use " + CONFIG_PROPERTIES_EXCLUDE_CONFIG + " instead.")
+            .define(USE_DEFAULTS_FROM,
+                    Type.STRING,
+                    USE_DEFAULTS_FROM_DEFAULT,
+                    Importance.MEDIUM,
+                    USE_DEFAULTS_FROM_DOC);
+
 
         ConfigPropertyFilterConfig(Map<String, ?> props) {
             super(DEF, ConfigUtils.translateDeprecatedConfigs(props, new String[][]{
@@ -82,5 +99,9 @@ public class DefaultConfigPropertyFilter implements ConfigPropertyFilter {
         Pattern excludePattern() {
             return MirrorUtils.compilePatternList(getList(CONFIG_PROPERTIES_EXCLUDE_CONFIG));
         }
+
+        String useDefaultsFrom() {
+            return getString(USE_DEFAULTS_FROM);
+        }
     }
 }
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java
index 3e582b50455..bbdbfed5843 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java
@@ -26,6 +26,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
+
 public class MirrorSourceConfig extends MirrorConnectorConfig {
 
     protected static final String REFRESH_TOPICS = "refresh.topics";
@@ -73,6 +75,19 @@ public class MirrorSourceConfig extends MirrorConnectorConfig {
     public static final String SYNC_TOPIC_CONFIGS_INTERVAL_SECONDS = SYNC_TOPIC_CONFIGS + INTERVAL_SECONDS_SUFFIX;
     private static final String SYNC_TOPIC_CONFIGS_INTERVAL_SECONDS_DOC = "Frequency of topic config sync.";
     public static final long SYNC_TOPIC_CONFIGS_INTERVAL_SECONDS_DEFAULT = 10 * 60;
+    @Deprecated
+    public static final String USE_INCREMENTAL_ALTER_CONFIGS = "use.incremental.alter.configs";
+    private static final String USE_INCREMENTAL_ALTER_CONFIG_DOC = "Deprecated. Which API to use for syncing topic configs. " +
+            "The valid values are 'requested', 'required' and 'never'. " +
+            "By default, set to 'requested', which means the IncrementalAlterConfigs API is being used for syncing topic configurations " +
+            "and if any request receives an error from an incompatible broker, it will fallback to using the deprecated AlterConfigs API. " +
+            "If explicitly set to 'required', the IncrementalAlterConfigs API is used without the fallback logic and +" +
+            "if it receives an error from an incompatible broker, the connector will fail." +
+            "If explicitly set to 'never', the AlterConfig is always used." +
+            "This setting will be removed and the behaviour of 'required' will be used in Kafka 4.0, therefore users should ensure that target broker is at least 2.3.0";
+    public static final String REQUEST_INCREMENTAL_ALTER_CONFIGS = "requested";
+    public static final String REQUIRE_INCREMENTAL_ALTER_CONFIGS = "required";
+    public static final String NEVER_USE_INCREMENTAL_ALTER_CONFIGS = "never";
 
     public static final String SYNC_TOPIC_ACLS_ENABLED = SYNC_TOPIC_ACLS + ENABLED_SUFFIX;
     private static final String SYNC_TOPIC_ACLS_ENABLED_DOC = "Whether to periodically configure remote topic ACLs to match their corresponding upstream topics.";
@@ -173,6 +188,10 @@ public class MirrorSourceConfig extends MirrorConnectorConfig {
         }
     }
 
+    String useIncrementalAlterConfigs() {
+        return getString(USE_INCREMENTAL_ALTER_CONFIGS);
+    }
+
     Duration syncTopicAclsInterval() {
         if (getBoolean(SYNC_TOPIC_ACLS_ENABLED)) {
             return Duration.ofSeconds(getLong(SYNC_TOPIC_ACLS_INTERVAL_SECONDS));
@@ -279,6 +298,13 @@ public class MirrorSourceConfig extends MirrorConnectorConfig {
                     SYNC_TOPIC_CONFIGS_INTERVAL_SECONDS_DEFAULT,
                     ConfigDef.Importance.LOW,
                     SYNC_TOPIC_CONFIGS_INTERVAL_SECONDS_DOC)
+            .define(
+                    USE_INCREMENTAL_ALTER_CONFIGS,
+                    ConfigDef.Type.STRING,
+                    REQUEST_INCREMENTAL_ALTER_CONFIGS,
+                    in(REQUEST_INCREMENTAL_ALTER_CONFIGS, REQUIRE_INCREMENTAL_ALTER_CONFIGS, NEVER_USE_INCREMENTAL_ALTER_CONFIGS),
+                    ConfigDef.Importance.LOW,
+                    USE_INCREMENTAL_ALTER_CONFIG_DOC)
             .define(
                     SYNC_TOPIC_ACLS_ENABLED,
                     ConfigDef.Type.BOOLEAN,
@@ -313,7 +339,7 @@ public class MirrorSourceConfig extends MirrorConnectorConfig {
                     OFFSET_SYNCS_TOPIC_LOCATION,
                     ConfigDef.Type.STRING,
                     OFFSET_SYNCS_TOPIC_LOCATION_DEFAULT,
-                    ConfigDef.ValidString.in(SOURCE_CLUSTER_ALIAS_DEFAULT, TARGET_CLUSTER_ALIAS_DEFAULT),
+                    in(SOURCE_CLUSTER_ALIAS_DEFAULT, TARGET_CLUSTER_ALIAS_DEFAULT),
                     ConfigDef.Importance.LOW,
                     OFFSET_SYNCS_TOPIC_LOCATION_DOC)
             .define(
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
index 4dd3c740700..b97fc7bc577 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
@@ -16,15 +16,18 @@
  */
 package org.apache.kafka.connect.mirror;
 
+import java.util.HashMap;
 import java.util.Locale;
 import java.util.Map.Entry;
 
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.config.ConfigValue;
 import org.apache.kafka.common.errors.SecurityDisabledException;
 import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.source.ExactlyOnceSupport;
 import org.apache.kafka.connect.source.SourceConnector;
 import org.apache.kafka.common.config.ConfigDef;
@@ -41,8 +44,10 @@ import org.apache.kafka.common.resource.ResourcePatternFilter;
 import org.apache.kafka.common.resource.PatternType;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.InvalidPartitionsException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.clients.admin.AlterConfigOp;
 import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.clients.admin.Config;
 import org.apache.kafka.clients.admin.ConfigEntry;
@@ -97,6 +102,7 @@ public class MirrorSourceConnector extends SourceConnector {
     private Admin sourceAdminClient;
     private Admin targetAdminClient;
     private Admin offsetSyncsAdminClient;
+    private volatile boolean useIncrementalAlterConfigs;
     private AtomicBoolean noAclAuthorizer = new AtomicBoolean(false);
 
     public MirrorSourceConnector() {
@@ -118,6 +124,18 @@ public class MirrorSourceConnector extends SourceConnector {
         this.configPropertyFilter = configPropertyFilter;
     }
 
+    // visible for testing the deprecated setting "use.incremental.alter.configs"
+    // this constructor should be removed when the deprecated setting is removed in Kafka 4.0
+    MirrorSourceConnector(SourceAndTarget sourceAndTarget, ReplicationPolicy replicationPolicy,
+                          MirrorSourceConfig config, ConfigPropertyFilter configPropertyFilter, Admin targetAdmin) {
+        this.sourceAndTarget = sourceAndTarget;
+        this.replicationPolicy = replicationPolicy;
+        this.configPropertyFilter = configPropertyFilter;
+        this.config = config;
+        this.useIncrementalAlterConfigs = !config.useIncrementalAlterConfigs().equals(MirrorSourceConfig.NEVER_USE_INCREMENTAL_ALTER_CONFIGS);
+        this.targetAdminClient = targetAdmin;                      
+    }
+        
     // visible for testing
     MirrorSourceConnector(Admin sourceAdminClient, Admin targetAdminClient) {
         this.sourceAdminClient = sourceAdminClient;
@@ -139,6 +157,7 @@ public class MirrorSourceConnector extends SourceConnector {
         replicationFactor = config.replicationFactor();
         sourceAdminClient = config.forwardingAdmin(config.sourceAdminConfig("replication-source-admin"));
         targetAdminClient = config.forwardingAdmin(config.targetAdminConfig("replication-target-admin"));
+        useIncrementalAlterConfigs =  !config.useIncrementalAlterConfigs().equals(MirrorSourceConfig.NEVER_USE_INCREMENTAL_ALTER_CONFIGS);
         offsetSyncsAdminClient = config.forwardingAdmin(config.offsetSyncsTopicAdminConfig());
         scheduler = new Scheduler(getClass(), config.entityLabel(), config.adminTimeout());
         scheduler.execute(this::createOffsetSyncsTopic, "creating upstream offset-syncs topic");
@@ -366,12 +385,18 @@ public class MirrorSourceConnector extends SourceConnector {
         updateTopicAcls(filteredBindings);
     }
 
-    private void syncTopicConfigs()
+    // visible for testing
+    void syncTopicConfigs()
             throws InterruptedException, ExecutionException {
+        boolean incremental = useIncrementalAlterConfigs;
         Map<String, Config> sourceConfigs = describeTopicConfigs(topicsBeingReplicated());
         Map<String, Config> targetConfigs = sourceConfigs.entrySet().stream()
-            .collect(Collectors.toMap(x -> formatRemoteTopic(x.getKey()), x -> targetConfig(x.getValue())));
-        updateTopicConfigs(targetConfigs);
+            .collect(Collectors.toMap(x -> formatRemoteTopic(x.getKey()), x -> targetConfig(x.getValue(), incremental)));
+        if (incremental) {
+            incrementalAlterConfigs(targetConfigs);
+        } else {
+            deprecatedAlterConfigs(targetConfigs);
+        }
     }
 
     private void createOffsetSyncsTopic() {
@@ -432,7 +457,7 @@ public class MirrorSourceConnector extends SourceConnector {
                 .map(sourceTopic -> {
                     String remoteTopic = formatRemoteTopic(sourceTopic);
                     int partitionCount = sourceTopicToPartitionCounts.get(sourceTopic).intValue();
-                    Map<String, String> configs = configToMap(targetConfig(sourceTopicToConfig.get(sourceTopic)));
+                    Map<String, String> configs = configToMap(targetConfig(sourceTopicToConfig.get(sourceTopic), false));
                     return new NewTopic(remoteTopic, partitionCount, (short) replicationFactor)
                             .configs(configs);
                 })
@@ -501,9 +526,10 @@ public class MirrorSourceConnector extends SourceConnector {
                 .collect(Collectors.toMap(ConfigEntry::name, ConfigEntry::value));
     }
 
-    @SuppressWarnings("deprecation")
+    // visible for testing
     // use deprecated alterConfigs API for broker compatibility back to 0.11.0
-    private void updateTopicConfigs(Map<String, Config> topicConfigs) {
+    @SuppressWarnings("deprecation")
+    void deprecatedAlterConfigs(Map<String, Config> topicConfigs) {
         Map<ConfigResource, Config> configs = topicConfigs.entrySet().stream()
             .collect(Collectors.toMap(x ->
                 new ConfigResource(ConfigResource.Type.TOPIC, x.getKey()), Entry::getValue));
@@ -515,6 +541,46 @@ public class MirrorSourceConnector extends SourceConnector {
         }));
     }
 
+    // visible for testing
+    void incrementalAlterConfigs(Map<String, Config> topicConfigs) {
+        Map<ConfigResource, Collection<AlterConfigOp>> configOps = new HashMap<>();
+        for (Map.Entry<String, Config> topicConfig : topicConfigs.entrySet()) {
+            Collection<AlterConfigOp> ops = new ArrayList<>();
+            ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, topicConfig.getKey());
+            for (ConfigEntry config : topicConfig.getValue().entries()) {
+                if (config.isDefault() && !shouldReplicateSourceDefault(config.name())) {
+                    ops.add(new AlterConfigOp(config, AlterConfigOp.OpType.DELETE));
+                } else {
+                    ops.add(new AlterConfigOp(config, AlterConfigOp.OpType.SET));
+                }
+            }
+            configOps.put(configResource, ops);
+        }
+        log.trace("Syncing configs for {} topics.", configOps.size());
+        AtomicReference<Boolean> encounteredError = new AtomicReference<>(false);
+        targetAdminClient.incrementalAlterConfigs(configOps).values().forEach((k, v) -> v.whenComplete((x, e) -> {
+            if (e != null) {
+                if (config.useIncrementalAlterConfigs().equals(MirrorSourceConfig.REQUEST_INCREMENTAL_ALTER_CONFIGS)
+                        && e instanceof UnsupportedVersionException && !encounteredError.get()) {
+                    //Fallback logic
+                    log.warn("The target cluster {} is not compatible with IncrementalAlterConfigs API. "
+                            + "Therefore using deprecated AlterConfigs API for syncing configs for topic {}",
+                            sourceAndTarget.target(), k.name(), e);
+                    encounteredError.set(true);
+                    useIncrementalAlterConfigs = false;
+                } else if (config.useIncrementalAlterConfigs().equals(MirrorSourceConfig.REQUIRE_INCREMENTAL_ALTER_CONFIGS)
+                        && e instanceof UnsupportedVersionException && !encounteredError.get()) {
+                    log.error("Failed to sync configs for topic {} on cluster {} with IncrementalAlterConfigs API", k.name(), sourceAndTarget.target(), e);
+                    encounteredError.set(true);
+                    context.raiseError(new ConnectException("use.incremental.alter.configs was set to \"required\", but the target cluster '"
+                            + sourceAndTarget.target() + "' is not compatible with IncrementalAlterConfigs API", e));
+                } else {
+                    log.warn("Could not alter configuration of topic {}.", k.name(), e);
+                }
+            }
+        }));
+    }
+
     private void updateTopicAcls(List<AclBinding> bindings) {
         log.trace("Syncing {} topic ACL bindings.", bindings.size());
         targetAdminClient.createAcls(bindings).values().forEach((k, v) -> v.whenComplete((x, e) -> {
@@ -539,9 +605,13 @@ public class MirrorSourceConnector extends SourceConnector {
             .collect(Collectors.toMap(x -> x.getKey().name(), Entry::getValue));
     }
 
-    Config targetConfig(Config sourceConfig) {
+    Config targetConfig(Config sourceConfig, boolean incremental) {
+        // If using incrementalAlterConfigs API, sync the default property with either SET or DELETE action determined by ConfigPropertyFilter::shouldReplicateSourceDefault later.
+        // If not using incrementalAlterConfigs API, sync the default property only if ConfigPropertyFilter::shouldReplicateSourceDefault returns true.
+        // If ConfigPropertyFilter::shouldReplicateConfigProperty returns false, do not sync the property at all.
         List<ConfigEntry> entries = sourceConfig.entries().stream()
-            .filter(x -> !x.isDefault() && !x.isReadOnly() && !x.isSensitive())
+            .filter(x -> incremental || (x.isDefault() && shouldReplicateSourceDefault(x.name())) || !x.isDefault())
+            .filter(x -> !x.isReadOnly() && !x.isSensitive())
             .filter(x -> x.source() != ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG)
             .filter(x -> shouldReplicateTopicConfigurationProperty(x.name()))
             .collect(Collectors.toList());
@@ -578,6 +648,10 @@ public class MirrorSourceConnector extends SourceConnector {
         return configPropertyFilter.shouldReplicateConfigProperty(property);
     }
 
+    boolean shouldReplicateSourceDefault(String property) {
+        return configPropertyFilter.shouldReplicateSourceDefault(property);
+    }
+
     // Recurse upstream to detect cycles, i.e. whether this topic is already on the target cluster
     boolean isCycle(String topic) {
         String source = replicationPolicy.topicSource(topic);
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
index dfe1f524f22..b178fa34b6f 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.connect.mirror;
 
+import org.apache.kafka.clients.admin.AlterConfigOp;
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.DescribeAclsResult;
 import org.apache.kafka.common.KafkaFuture;
@@ -24,7 +25,9 @@ import org.apache.kafka.common.acl.AccessControlEntry;
 import org.apache.kafka.common.acl.AclBinding;
 import org.apache.kafka.common.acl.AclOperation;
 import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.config.ConfigValue;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.errors.SecurityDisabledException;
 import org.apache.kafka.common.resource.PatternType;
 import org.apache.kafka.common.resource.ResourcePattern;
@@ -35,9 +38,11 @@ import org.apache.kafka.connect.connector.ConnectorContext;
 import org.apache.kafka.clients.admin.ConfigEntry;
 import org.apache.kafka.clients.admin.NewTopic;
 
+import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.source.ExactlyOnceSupport;
 import org.junit.jupiter.api.Test;
 
+import static org.apache.kafka.clients.admin.AdminClientTestUtils.alterConfigsResult;
 import static org.apache.kafka.clients.consumer.ConsumerConfig.ISOLATION_LEVEL_CONFIG;
 import static org.apache.kafka.connect.mirror.MirrorConnectorConfig.CONSUMER_CLIENT_PREFIX;
 import static org.apache.kafka.connect.mirror.MirrorConnectorConfig.SOURCE_PREFIX;
@@ -50,12 +55,14 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.mockito.ArgumentMatchers.isA;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -74,6 +81,19 @@ import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 
 public class MirrorSourceConnectorTest {
+    private ConfigPropertyFilter getConfigPropertyFilter() {
+        return new ConfigPropertyFilter() {
+            @Override
+            public boolean shouldReplicateConfigProperty(String prop) {
+                return true;
+            }
+
+            @Override
+            public boolean shouldReplicateSourceDefault(String prop) {
+                return false;
+            }
+        };
+    }
 
     @Test
     public void testReplicatesHeartbeatsByDefault() {
@@ -94,7 +114,7 @@ public class MirrorSourceConnectorTest {
     @Test
     public void testNoCycles() {
         MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"),
-            new DefaultReplicationPolicy(), x -> true, x -> true);
+            new DefaultReplicationPolicy(), x -> true, getConfigPropertyFilter());
         assertFalse(connector.shouldReplicateTopic("target.topic1"), "should not allow cycles");
         assertFalse(connector.shouldReplicateTopic("target.source.topic1"), "should not allow cycles");
         assertFalse(connector.shouldReplicateTopic("source.target.topic1"), "should not allow cycles");
@@ -107,7 +127,7 @@ public class MirrorSourceConnectorTest {
     @Test
     public void testIdentityReplication() {
         MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"),
-            new IdentityReplicationPolicy(), x -> true, x -> true);
+            new IdentityReplicationPolicy(), x -> true, getConfigPropertyFilter());
         assertTrue(connector.shouldReplicateTopic("target.topic1"), "should allow cycles");
         assertTrue(connector.shouldReplicateTopic("target.source.topic1"), "should allow cycles");
         assertTrue(connector.shouldReplicateTopic("source.target.topic1"), "should allow cycles");
@@ -127,7 +147,7 @@ public class MirrorSourceConnectorTest {
     @Test
     public void testAclFiltering() {
         MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"),
-            new DefaultReplicationPolicy(), x -> true, x -> true);
+            new DefaultReplicationPolicy(), x -> true, getConfigPropertyFilter());
         assertFalse(connector.shouldReplicateAcl(
             new AclBinding(new ResourcePattern(ResourceType.TOPIC, "test_topic", PatternType.LITERAL),
             new AccessControlEntry("kafka", "", AclOperation.WRITE, AclPermissionType.ALLOW))), "should not replicate ALLOW WRITE");
@@ -139,7 +159,7 @@ public class MirrorSourceConnectorTest {
     @Test
     public void testAclTransformation() {
         MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"),
-            new DefaultReplicationPolicy(), x -> true, x -> true);
+            new DefaultReplicationPolicy(), x -> true, getConfigPropertyFilter());
         AclBinding allowAllAclBinding = new AclBinding(
             new ResourcePattern(ResourceType.TOPIC, "test_topic", PatternType.LITERAL),
             new AccessControlEntry("kafka", "", AclOperation.ALL, AclPermissionType.ALLOW));
@@ -209,15 +229,62 @@ public class MirrorSourceConnectorTest {
             new DefaultReplicationPolicy(), x -> true, new DefaultConfigPropertyFilter());
         ArrayList<ConfigEntry> entries = new ArrayList<>();
         entries.add(new ConfigEntry("name-1", "value-1"));
+        entries.add(new ConfigEntry("name-2", "value-2", ConfigEntry.ConfigSource.DEFAULT_CONFIG, false, false, Collections.emptyList(), ConfigEntry.ConfigType.STRING, ""));
+        entries.add(new ConfigEntry("min.insync.replicas", "2"));
+        Config config = new Config(entries);
+        Config targetConfig = connector.targetConfig(config, true);
+        assertTrue(targetConfig.entries().stream()
+            .anyMatch(x -> x.name().equals("name-1")), "should replicate properties");
+        assertTrue(targetConfig.entries().stream()
+            .anyMatch(x -> x.name().equals("name-2")), "should include default properties");
+        assertFalse(targetConfig.entries().stream()
+            .anyMatch(x -> x.name().equals("min.insync.replicas")), "should not replicate excluded properties");
+    }
+
+    @Test
+    @Deprecated
+    public void testConfigPropertyFilteringWithAlterConfigs() {
+        MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"),
+                new DefaultReplicationPolicy(), x -> true, new DefaultConfigPropertyFilter());
+        List<ConfigEntry> entries = new ArrayList<>();
+        entries.add(new ConfigEntry("name-1", "value-1"));
+        // When "use.defaults.from" set to "target" by default, the config with default value should be excluded
+        entries.add(new ConfigEntry("name-2", "value-2", ConfigEntry.ConfigSource.DEFAULT_CONFIG, false, false, Collections.emptyList(), ConfigEntry.ConfigType.STRING, ""));
         entries.add(new ConfigEntry("min.insync.replicas", "2"));
         Config config = new Config(entries);
-        Config targetConfig = connector.targetConfig(config);
+        Config targetConfig = connector.targetConfig(config, false);
         assertTrue(targetConfig.entries().stream()
             .anyMatch(x -> x.name().equals("name-1")), "should replicate properties");
+        assertFalse(targetConfig.entries().stream()
+            .anyMatch(x -> x.name().equals("name-2")), "should not replicate default properties");
         assertFalse(targetConfig.entries().stream()
             .anyMatch(x -> x.name().equals("min.insync.replicas")), "should not replicate excluded properties");
     }
 
+    @Test
+    @Deprecated
+    public void testConfigPropertyFilteringWithAlterConfigsAndSourceDefault() {
+        Map<String, Object> filterConfig = Collections.singletonMap(DefaultConfigPropertyFilter.USE_DEFAULTS_FROM, "source");
+        DefaultConfigPropertyFilter filter = new DefaultConfigPropertyFilter();
+        filter.configure(filterConfig);
+
+        MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"),
+                new DefaultReplicationPolicy(),  x -> true, filter);
+        List<ConfigEntry> entries = new ArrayList<>();
+        entries.add(new ConfigEntry("name-1", "value-1"));
+        // When "use.defaults.from" explicitly set to "source", the config with default value should be replicated
+        entries.add(new ConfigEntry("name-2", "value-2", ConfigEntry.ConfigSource.DEFAULT_CONFIG, false, false, Collections.emptyList(), ConfigEntry.ConfigType.STRING, ""));
+        entries.add(new ConfigEntry("min.insync.replicas", "2"));
+        Config config = new Config(entries);
+        Config targetConfig = connector.targetConfig(config, false);
+        assertTrue(targetConfig.entries().stream()
+                .anyMatch(x -> x.name().equals("name-1")), "should replicate properties");
+        assertTrue(targetConfig.entries().stream()
+                .anyMatch(x -> x.name().equals("name-2")), "should include default properties");
+        assertFalse(targetConfig.entries().stream()
+                .anyMatch(x -> x.name().equals("min.insync.replicas")), "should not replicate excluded properties");
+    }
+
     @Test
     public void testNewTopicConfigs() throws Exception {
         Map<String, Object> filterConfig = new HashMap<>();
@@ -261,6 +328,114 @@ public class MirrorSourceConnectorTest {
         verify(connector).createNewTopics(any(), any());
     }
 
+    @Test
+    @Deprecated
+    public void testIncrementalAlterConfigsRequested() throws Exception {
+        Map<String, String> props = makeProps();
+        props.put(MirrorSourceConfig.USE_INCREMENTAL_ALTER_CONFIGS, MirrorSourceConfig.REQUEST_INCREMENTAL_ALTER_CONFIGS);
+        MirrorSourceConfig connectorConfig = new MirrorSourceConfig(props);
+
+        Admin admin = mock(Admin.class);
+        MirrorSourceConnector connector = spy(new MirrorSourceConnector(new SourceAndTarget("source", "target"),
+                new DefaultReplicationPolicy(), connectorConfig, new DefaultConfigPropertyFilter(), admin));
+        final String topic = "testtopic";
+        List<ConfigEntry> entries = Collections.singletonList(new ConfigEntry("name-1", "value-1"));
+        Config config = new Config(entries);
+        doReturn(Collections.singletonMap(topic, config)).when(connector).describeTopicConfigs(any());
+        doReturn(alterConfigsResult(new ConfigResource(ConfigResource.Type.TOPIC, topic), new UnsupportedVersionException("Unsupported API"))).when(admin).incrementalAlterConfigs(any());
+        doNothing().when(connector).deprecatedAlterConfigs(any());
+        connector.syncTopicConfigs();
+        Map<String, Config> topicConfigs = Collections.singletonMap("source." + topic, config);
+        verify(connector).incrementalAlterConfigs(topicConfigs);
+
+        // the next time we sync topic configurations, expect to use the deprecated API
+        connector.syncTopicConfigs();
+        verify(connector, times(1)).deprecatedAlterConfigs(topicConfigs);
+    }
+
+    @Test
+    @Deprecated
+    public void testIncrementalAlterConfigsRequired() throws Exception {
+        Map<String, String> props = makeProps();
+        props.put(MirrorSourceConfig.USE_INCREMENTAL_ALTER_CONFIGS, MirrorSourceConfig.REQUIRE_INCREMENTAL_ALTER_CONFIGS);
+        MirrorSourceConfig connectorConfig = new MirrorSourceConfig(props);
+
+        Admin admin = mock(Admin.class);
+        MirrorSourceConnector connector = spy(new MirrorSourceConnector(new SourceAndTarget("source", "target"),
+                new DefaultReplicationPolicy(), connectorConfig, new DefaultConfigPropertyFilter(), admin));
+        final String topic = "testtopic";
+        List<ConfigEntry> entries = new ArrayList<>();
+        ConfigEntry entryWithNonDefaultValue = new ConfigEntry("name-1", "value-1");
+        ConfigEntry entryWithDefaultValue = new ConfigEntry("name-2", "value-2", ConfigEntry.ConfigSource.DEFAULT_CONFIG, false, false,
+                Collections.emptyList(), ConfigEntry.ConfigType.STRING, "");
+        entries.add(entryWithNonDefaultValue);
+        entries.add(entryWithDefaultValue);
+        Config config = new Config(entries);
+        doReturn(Collections.singletonMap(topic, config)).when(connector).describeTopicConfigs(any());
+
+        doAnswer(invocation -> {
+            Map<ConfigResource, Collection<AlterConfigOp>> configOps = invocation.getArgument(0);
+            assertNotNull(configOps);
+            assertEquals(1, configOps.size());
+
+            ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "source." + topic);
+            Collection<AlterConfigOp> ops = new ArrayList<>();
+            ops.add(new AlterConfigOp(entryWithNonDefaultValue, AlterConfigOp.OpType.SET));
+            ops.add(new AlterConfigOp(entryWithDefaultValue, AlterConfigOp.OpType.DELETE));
+
+            assertEquals(ops, configOps.get(configResource));
+
+            return alterConfigsResult(configResource);
+        }).when(admin).incrementalAlterConfigs(any());
+
+        connector.syncTopicConfigs();
+        Map<String, Config> topicConfigs = Collections.singletonMap("source." + topic, config);
+        verify(connector).incrementalAlterConfigs(topicConfigs);
+    }
+
+    @Test
+    @Deprecated
+    public void testIncrementalAlterConfigsRequiredButUnsupported() throws Exception {
+        Map<String, String> props = makeProps();
+        props.put(MirrorSourceConfig.USE_INCREMENTAL_ALTER_CONFIGS, MirrorSourceConfig.REQUIRE_INCREMENTAL_ALTER_CONFIGS);
+        MirrorSourceConfig connectorConfig = new MirrorSourceConfig(props);
+
+        Admin admin = mock(Admin.class);
+        ConnectorContext connectorContext = mock(ConnectorContext.class);
+        MirrorSourceConnector connector = spy(new MirrorSourceConnector(new SourceAndTarget("source", "target"),
+                new DefaultReplicationPolicy(), connectorConfig, new DefaultConfigPropertyFilter(), admin));
+        connector.initialize(connectorContext);
+        final String topic = "testtopic";
+        List<ConfigEntry> entries = Collections.singletonList(new ConfigEntry("name-1", "value-1"));
+        Config config = new Config(entries);
+        doReturn(Collections.singletonMap(topic, config)).when(connector).describeTopicConfigs(any());
+        doReturn(alterConfigsResult(new ConfigResource(ConfigResource.Type.TOPIC, topic), new UnsupportedVersionException("Unsupported API"))).when(admin).incrementalAlterConfigs(any());
+
+        connector.syncTopicConfigs();
+        verify(connectorContext).raiseError(isA(ConnectException.class));
+    }
+
+
+    @Test
+    @Deprecated
+    public void testIncrementalAlterConfigsNeverUsed() throws Exception {
+        Map<String, String> props = makeProps();
+        props.put(MirrorSourceConfig.USE_INCREMENTAL_ALTER_CONFIGS, MirrorSourceConfig.NEVER_USE_INCREMENTAL_ALTER_CONFIGS);
+        MirrorSourceConfig connectorConfigs = new MirrorSourceConfig(props);
+
+        MirrorSourceConnector connector = spy(new MirrorSourceConnector(new SourceAndTarget("source", "target"),
+                new DefaultReplicationPolicy(), connectorConfigs, new DefaultConfigPropertyFilter(), null));
+        final String topic = "testtopic";
+        List<ConfigEntry> entries = Collections.singletonList(new ConfigEntry("name-1", "value-1"));
+        Config config = new Config(entries);
+        doReturn(Collections.singletonMap(topic, config)).when(connector).describeTopicConfigs(any());
+        doNothing().when(connector).deprecatedAlterConfigs(any());
+        connector.syncTopicConfigs();
+        Map<String, Config> topicConfigs = Collections.singletonMap("source." + topic, config);
+        verify(connector).deprecatedAlterConfigs(topicConfigs);
+        verify(connector, never()).incrementalAlterConfigs(any());
+    }
+
     @Test
     public void testMirrorSourceConnectorTaskConfig() {
         List<TopicPartition> knownSourceTopicPartitions = new ArrayList<>();
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
index 27579d3275f..802b02bfe32 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
@@ -18,7 +18,9 @@ package org.apache.kafka.connect.mirror.integration;
 
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.AlterConfigOp;
 import org.apache.kafka.clients.admin.Config;
+import org.apache.kafka.clients.admin.ConfigEntry;
 import org.apache.kafka.clients.admin.DescribeConfigsResult;
 import org.apache.kafka.clients.admin.ListOffsetsResult;
 import org.apache.kafka.clients.admin.OffsetSpec;
@@ -171,6 +173,8 @@ public class MirrorConnectorsIntegrationBaseTest {
 
         // exclude topic config:
         mm2Props.put(DefaultConfigPropertyFilter.CONFIG_PROPERTIES_EXCLUDE_CONFIG, "delete\\.retention\\..*");
+        // set it to shorter sync interval for testing
+        mm2Props.put("sync.topic.configs.interval.seconds", "1");
 
         mm2Config = new MirrorMakerConfig(mm2Props);
         primaryWorkerProps = mm2Config.workerConfig(new SourceAndTarget(BACKUP_CLUSTER_ALIAS, PRIMARY_CLUSTER_ALIAS));
@@ -263,7 +267,7 @@ public class MirrorConnectorsIntegrationBaseTest {
         String reverseTopic1 = remoteTopicName("test-topic-1", BACKUP_CLUSTER_ALIAS);
         String consumerGroupName = "consumer-group-testReplication";
         Map<String, Object> consumerProps = Collections.singletonMap("group.id", consumerGroupName);
-        // warm up consumers before starting the connectors so we don't need to wait for discovery
+        // warm up consumers before starting the connectors, so we don't need to wait for discovery
         warmUpConsumer(consumerProps);
         
         mm2Config = new MirrorMakerConfig(mm2Props);
@@ -355,7 +359,6 @@ public class MirrorConnectorsIntegrationBaseTest {
                 assertTrue(primaryConsumer.position(
                         new TopicPartition(reverseTopic1, 0)) <= NUM_RECORDS_PRODUCED, "Consumer failedback beyond expected downstream offset.");
             }
-
         }
 
         // create more matching topics
@@ -399,7 +402,7 @@ public class MirrorConnectorsIntegrationBaseTest {
         // produce to all test-topic-empty's partitions, except the last partition
         produceMessages(primary, topic, NUM_PARTITIONS - 1);
         
-        // consume before starting the connectors so we don't need to wait for discovery
+        // consume before starting the connectors, so we don't need to wait for discovery
         int expectedRecords = NUM_RECORDS_PER_PARTITION * (NUM_PARTITIONS - 1);
         try (Consumer<byte[], byte[]> primaryConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, topic)) {
             waitForConsumingAllRecords(primaryConsumer, expectedRecords);
@@ -452,7 +455,7 @@ public class MirrorConnectorsIntegrationBaseTest {
                 put("group.id", consumerGroupName);
                 put("auto.offset.reset", "earliest");
             }};
-        // create consumers before starting the connectors so we don't need to wait for discovery
+        // create consumers before starting the connectors, so we don't need to wait for discovery
         try (Consumer<byte[], byte[]> primaryConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, 
                 "test-topic-1")) {
             // we need to wait for consuming all the records for MM2 replicating the expected offsets
@@ -483,7 +486,6 @@ public class MirrorConnectorsIntegrationBaseTest {
 
             waitForConsumerGroupFullSync(backup, Collections.singletonList(backupTopic1),
                     consumerGroupName, NUM_RECORDS_PRODUCED, offsetLagMax);
-
             assertDownstreamRedeliveriesBoundedByMaxLag(backupConsumer, offsetLagMax);
         }
 
@@ -503,7 +505,7 @@ public class MirrorConnectorsIntegrationBaseTest {
             waitForConsumingAllRecords(consumer1, NUM_RECORDS_PRODUCED);
         }
 
-        // create a consumer at backup cluster with same consumer group Id to consume old and new topic
+        // create a consumer at backup cluster with same consumer group ID to consume old and new topic
         try (Consumer<byte[], byte[]> backupConsumer = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
             "group.id", consumerGroupName), backupTopic1, remoteTopic2)) {
 
@@ -655,6 +657,135 @@ public class MirrorConnectorsIntegrationBaseTest {
         assertMonotonicCheckpoints(backup, "primary.checkpoints.internal");
     }
 
+    @Test
+    public void testSyncTopicConfigs() throws InterruptedException {
+        mm2Config = new MirrorMakerConfig(mm2Props);
+
+        waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
+
+        // create topic with configuration to test:
+        final Map<String, String> topicConfig = new HashMap<>();
+        topicConfig.put("delete.retention.ms", "1000"); // should be excluded (default value is 86400000)
+        topicConfig.put("retention.bytes", "1000"); // should be included, default value is -1
+
+        final String topic = "test-topic-with-config";
+        final String backupTopic = remoteTopicName(topic, PRIMARY_CLUSTER_ALIAS);
+
+        primary.kafka().createTopic(topic, NUM_PARTITIONS, 1, topicConfig);
+        waitForTopicCreated(backup, backupTopic);
+
+        // alter configs on the target topic
+        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, backupTopic);
+        Collection<AlterConfigOp> ops = new ArrayList<>();
+        ops.add(new AlterConfigOp(new ConfigEntry("delete.retention.ms", "2000"), AlterConfigOp.OpType.SET));
+        ops.add(new AlterConfigOp(new ConfigEntry("retention.bytes", "2000"), AlterConfigOp.OpType.SET));
+        Map<ConfigResource, Collection<AlterConfigOp>> configOps = Collections.singletonMap(configResource, ops);
+        // alter configs on target cluster
+        backup.kafka().incrementalAlterConfigs(configOps);
+
+        waitForCondition(() -> {
+            String primaryConfig, backupConfig;
+            primaryConfig = getTopicConfig(primary.kafka(), topic, "delete.retention.ms");
+            backupConfig = getTopicConfig(backup.kafka(), backupTopic, "delete.retention.ms");
+            assertNotEquals(primaryConfig, backupConfig,
+                    "`delete.retention.ms` should be different, because it's in exclude filter! ");
+            assertEquals("2000", backupConfig, "`delete.retention.ms` should be 2000, because it's explicitly defined on the target topic! ");
+
+            // regression test for the config that are still supposed to be replicated
+            primaryConfig = getTopicConfig(primary.kafka(), topic, "retention.bytes");
+            backupConfig = getTopicConfig(backup.kafka(), backupTopic, "retention.bytes");
+            assertEquals(primaryConfig, backupConfig,
+                    "`retention.bytes` should be the same, because it isn't in exclude filter! ");
+            return true;
+        }, 30000, "Topic configurations were not synced");
+    }
+
+    @Test
+    public void testReplicateSourceDefault() throws Exception {
+        mm2Props.put(DefaultConfigPropertyFilter.USE_DEFAULTS_FROM, "source");
+        mm2Config = new MirrorMakerConfig(mm2Props);
+
+        waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
+
+        // create topic with default configurations to test
+        final String topic = "test-topic-with-config";
+        final String backupTopic = remoteTopicName(topic, PRIMARY_CLUSTER_ALIAS);
+
+        primary.kafka().createTopic(topic, NUM_PARTITIONS, 1, new HashMap<>());
+        waitForTopicCreated(backup, backupTopic);
+
+        // alter target topic configurations
+        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, backupTopic);
+        Collection<AlterConfigOp> ops = new ArrayList<>();
+        ops.add(new AlterConfigOp(new ConfigEntry("delete.retention.ms", "2000"), AlterConfigOp.OpType.SET));
+        ops.add(new AlterConfigOp(new ConfigEntry("retention.bytes", "2000"), AlterConfigOp.OpType.SET));
+        Map<ConfigResource, Collection<AlterConfigOp>> configOps = Collections.singletonMap(configResource, ops);
+        backup.kafka().incrementalAlterConfigs(configOps);
+
+        waitForCondition(() -> {
+            String primaryConfig, backupConfig;
+            // altered configuration of the target topic should be synced with the source cluster's default
+            primaryConfig = getTopicConfig(primary.kafka(), topic, "retention.bytes");
+            backupConfig = getTopicConfig(backup.kafka(), backupTopic, "retention.bytes");
+            assertEquals(primaryConfig, backupConfig,
+                    "`retention.bytes` should be the same, because the source cluster default is being used! ");
+            assertEquals("-1", backupConfig,
+                    "`retention.bytes` should be synced with default value!");
+
+            // when using the source cluster's default, the excluded configuration should still not be changed
+            primaryConfig = getTopicConfig(primary.kafka(), topic, "delete.retention.ms");
+            backupConfig = getTopicConfig(backup.kafka(), backupTopic, "delete.retention.ms");
+            assertNotEquals(primaryConfig, backupConfig,
+                    "`delete.retention.ms` should be different, because it's in exclude filter! ");
+            assertEquals("2000", backupConfig, "`delete.retention.ms` should be 2000, because it's explicitly defined on the target topic! ");
+            return true;
+        }, 30000, "Topic configurations were not synced");
+    }
+
+    @Test
+    public void testReplicateTargetDefault() throws Exception {
+        mm2Config = new MirrorMakerConfig(mm2Props);
+
+        waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
+
+        // create topic with configuration to test:
+        final Map<String, String> topicConfig = new HashMap<>();
+        topicConfig.put("retention.bytes", "1000");
+
+        final String topic = "test-topic-with-config";
+        final String backupTopic = remoteTopicName(topic, PRIMARY_CLUSTER_ALIAS);
+
+        primary.kafka().createTopic(topic, NUM_PARTITIONS, 1, topicConfig);
+        waitForTopicCreated(backup, backupTopic);
+
+        waitForCondition(() -> {
+            String primaryConfig, backupConfig;
+            primaryConfig = getTopicConfig(primary.kafka(), topic, "retention.bytes");
+            backupConfig = getTopicConfig(backup.kafka(), backupTopic, "retention.bytes");
+            assertEquals(primaryConfig, backupConfig,
+                    "`retention.bytes` should be the same");
+            assertEquals("1000", backupConfig,
+                    "`retention.bytes` should be synced with default value!");
+            return true;
+        }, 30000, "Topic configurations were not synced");
+
+        // delete the previously altered configuration of the source topic
+        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
+        Collection<AlterConfigOp> ops = new ArrayList<>();
+        ops.add(new AlterConfigOp(new ConfigEntry("retention.bytes", "1000"), AlterConfigOp.OpType.DELETE));
+        Map<ConfigResource, Collection<AlterConfigOp>> configOps = Collections.singletonMap(configResource, ops);
+        primary.kafka().incrementalAlterConfigs(configOps);
+
+        waitForCondition(() -> {
+            String backupConfig;
+            // the configuration on the target topic should be changed to the target cluster's default
+            backupConfig = getTopicConfig(backup.kafka(), backupTopic, "retention.bytes");
+            assertEquals("-1", backupConfig,
+                    "`retention.bytes` should be synced with target's default value!");
+            return true;
+        }, 30000, "Topic configurations were not synced");
+    }
+
 
     private TopicPartition remoteTopicPartition(TopicPartition tp, String alias) {
         return new TopicPartition(remoteTopicName(tp.topic(), alias), tp.partition());
@@ -959,7 +1090,7 @@ public class MirrorConnectorsIntegrationBaseTest {
         Map<String, Object> adminClientConfig = Collections.singletonMap(
             AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, REQUEST_TIMEOUT_DURATION_MS);
 
-        // create these topics before starting the connectors so we don't need to wait for discovery
+        // create these topics before starting the connectors, so we don't need to wait for discovery
         primary.kafka().createTopic("test-topic-no-checkpoints", 1, 1, emptyMap, adminClientConfig);
         primary.kafka().createTopic("test-topic-1", NUM_PARTITIONS, 1, topicConfig, adminClientConfig);
         backup.kafka().createTopic("test-topic-1", NUM_PARTITIONS, 1, emptyMap, adminClientConfig);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
index 9b2951ef19e..f5c43c718a7 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
@@ -25,11 +25,13 @@ import kafka.zk.EmbeddedZookeeper;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.AlterConfigsResult;
 import org.apache.kafka.clients.admin.DescribeTopicsResult;
 import org.apache.kafka.clients.admin.ListOffsetsOptions;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.admin.OffsetSpec;
 import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.admin.AlterConfigOp;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -41,6 +43,7 @@ import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.config.SslConfigs;
 import org.apache.kafka.common.config.types.Password;
 import org.apache.kafka.common.errors.InvalidReplicationFactorException;
@@ -353,6 +356,25 @@ public class EmbeddedKafkaCluster {
         return results;
     }
 
+    /**
+     * Update the configuration for the specified resources with the default options.
+     *
+     * @param configs The resources with their configs (topic is the only resource type with configs that can
+     *                be updated currently)
+     * @return The AlterConfigsResult
+     */
+    public AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource, Collection<AlterConfigOp>> configs) {
+        AlterConfigsResult result;
+        log.info("Altering configs for topics {}", configs.keySet());
+        try (Admin admin = createAdminClient()) {
+            result = admin.incrementalAlterConfigs(configs);
+            log.info("Altered configurations {}", result.all().get());
+        } catch (Exception e) {
+            throw new AssertionError("Could not alter topic configurations " + configs.keySet(), e);
+        }
+        return result;
+    }
+
     /**
      * Create a Kafka topic with 1 partition and a replication factor of 1.
      *