You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2021/02/15 17:49:13 UTC

[kafka] branch 2.7 updated: KAFKA-10710; MM2 - Create herders only if source->target.enabled=true and heartbeats are disabled (#9589)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.7 by this push:
     new 1c7c93b  KAFKA-10710; MM2 - Create herders only if source->target.enabled=true and heartbeats are disabled (#9589)
1c7c93b is described below

commit 1c7c93ba37308f4c3c3282b455b5263c0e455f09
Author: Julien Chanaud <ch...@gmail.com>
AuthorDate: Thu Jan 28 23:52:51 2021 +0100

    KAFKA-10710; MM2 - Create herders only if source->target.enabled=true and heartbeats are disabled (#9589)
    
    By default Mirror Maker 2 creates herders for all the possible combinations even if the "links" are not enabled.
    
    This is because the beats are emitted from the "opposite" herder.
    If there is a replication flow from A to B and heartbeats are required, 2 herders are needed :
    
    - A->B for the MirrorSourceConnector
    - B->A for the MirrorHeartbeatConnector
    
    The MirrorHeartbeatConnector on B->A emits beats into topic heartbeats on cluster A.
    The MirrorSourceConnector on A->B then replicates whichever topic is configured as well as heartbeats.
    
    In cases with multiple clusters (10 and more), this leads to an incredible amount of connections, file descriptors and configuration topics created in every target clusters that are not necessary.
    
    With this code change, we will leverage the top level property "emit.heartbeats.enabled" which defaults to "true".
    We skip creating the A->B herder whenever A->B.emit.heartbeats.enabled=false (defaults to true) and A->B.enabled=false (defaults to false).
    
    Existing users will not see any change and if they depend on these "opposites" herders for their monitoring, it will still work.
    New users with more complex use case can change this property and fine tune their heartbeat generation.
    
    Reviewers: Ryanne Dolan <ry...@gmail.com>,  Sanjana Kaundinya <sk...@gmail.com>, Jason Gustafson <ja...@confluent.io>
---
 .../kafka/connect/mirror/MirrorMakerConfig.java    | 23 ++++++-
 .../connect/mirror/MirrorMakerConfigTest.java      | 75 ++++++++++++++++++++++
 2 files changed, 96 insertions(+), 2 deletions(-)

diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
index 059ab78..b5c361c 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
@@ -87,11 +87,30 @@ public class MirrorMakerConfig extends AbstractConfig {
     public List<SourceAndTarget> clusterPairs() {
         List<SourceAndTarget> pairs = new ArrayList<>();
         Set<String> clusters = clusters();
+        Map<String, String> originalStrings = originalsStrings();
+        boolean globalHeartbeatsEnabled = MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED_DEFAULT;
+        if (originalStrings.containsKey(MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED)) {
+            globalHeartbeatsEnabled = Boolean.valueOf(originalStrings.get(MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED));
+        }
+
         for (String source : clusters) {
             for (String target : clusters) {
-                SourceAndTarget sourceAndTarget = new SourceAndTarget(source, target);
                 if (!source.equals(target)) {
-                    pairs.add(sourceAndTarget);
+                    String clusterPairConfigPrefix = source + "->" + target + ".";
+                    boolean clusterPairEnabled = Boolean.valueOf(originalStrings.getOrDefault(clusterPairConfigPrefix + "enabled", "false"));
+                    boolean clusterPairHeartbeatsEnabled = globalHeartbeatsEnabled;
+                    if (originalStrings.containsKey(clusterPairConfigPrefix + MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED)) {
+                        clusterPairHeartbeatsEnabled = Boolean.valueOf(originalStrings.get(clusterPairConfigPrefix + MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED));
+                    }
+
+                    // By default, all source->target Herder combinations are created even if `x->y.enabled=false`
+                    // Unless `emit.heartbeats.enabled=false` or `x->y.emit.heartbeats.enabled=false`
+                    // Reason for this behavior: for a given replication flow A->B with heartbeats, 2 herders are required :
+                    // B->A for the MirrorHeartbeatConnector (emits heartbeats into A for monitoring replication health)
+                    // A->B for the MirrorSourceConnector (actual replication flow)
+                    if (clusterPairEnabled || clusterPairHeartbeatsEnabled) {
+                        pairs.add(new SourceAndTarget(source, target));
+                    }
                 }
             }
         }
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
index 1cba87f..c223acc 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
@@ -28,6 +28,7 @@ import java.util.Set;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Arrays;
+import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -256,6 +257,80 @@ public class MirrorMakerConfigTest {
             "secret2", bProps.get("producer.ssl.key.password"));
     }
 
+    @Test
+    public void testClusterPairsWithDefaultSettings() {
+        MirrorMakerConfig mirrorConfig = new MirrorMakerConfig(makeProps(
+                "clusters", "a, b, c"));
+        // implicit configuration associated
+        // a->b.enabled=false
+        // a->b.emit.heartbeat.enabled=true
+        // a->c.enabled=false
+        // a->c.emit.heartbeat.enabled=true
+        // b->a.enabled=false
+        // b->a.emit.heartbeat.enabled=true
+        // b->c.enabled=false
+        // b->c.emit.heartbeat.enabled=true
+        // c->a.enabled=false
+        // c->a.emit.heartbeat.enabled=true
+        // c->b.enabled=false
+        // c->b.emit.heartbeat.enabled=true
+        List<SourceAndTarget> clusterPairs = mirrorConfig.clusterPairs();
+        assertEquals("clusterPairs count should match all combinations count",
+                6, clusterPairs.size());
+    }
+
+    @Test
+    public void testEmptyClusterPairsWithGloballyDisabledHeartbeats() {
+        MirrorMakerConfig mirrorConfig = new MirrorMakerConfig(makeProps(
+                "clusters", "a, b, c",
+                "emit.heartbeats.enabled", "false"));
+        assertEquals("clusterPairs count should be 0",
+                0, mirrorConfig.clusterPairs().size());
+    }
+
+    @Test
+    public void testClusterPairsWithTwoDisabledHeartbeats() {
+        MirrorMakerConfig mirrorConfig = new MirrorMakerConfig(makeProps(
+                "clusters", "a, b, c",
+                "a->b.emit.heartbeats.enabled", "false",
+                "a->c.emit.heartbeats.enabled", "false"));
+        List<SourceAndTarget> clusterPairs = mirrorConfig.clusterPairs();
+        assertEquals("clusterPairs count should match all combinations count except x->y.emit.heartbeats.enabled=false",
+                4, clusterPairs.size());
+    }
+
+    @Test
+    public void testClusterPairsWithGloballyDisabledHeartbeats() {
+        MirrorMakerConfig mirrorConfig = new MirrorMakerConfig(makeProps(
+                "clusters", "a, b, c, d, e, f",
+                "emit.heartbeats.enabled", "false",
+                "a->b.enabled", "true",
+                "a->c.enabled", "true",
+                "a->d.enabled", "true",
+                "a->e.enabled", "false",
+                "a->f.enabled", "false"));
+        List<SourceAndTarget> clusterPairs = mirrorConfig.clusterPairs();
+        assertEquals("clusterPairs count should match (x->y.enabled=true or x->y.emit.heartbeats.enabled=true) count",
+                3, clusterPairs.size());
+
+        // Link b->a.enabled doesn't exist therefore it must not be in clusterPairs
+        SourceAndTarget sourceAndTarget = new SourceAndTarget("b", "a");
+        assertFalse("disabled/unset link x->y should not be in clusterPairs", clusterPairs.contains(sourceAndTarget));
+    }
+
+    @Test
+    public void testClusterPairsWithGloballyDisabledHeartbeatsCentralLocal() {
+        MirrorMakerConfig mirrorConfig = new MirrorMakerConfig(makeProps(
+                "clusters", "central, local_one, local_two, beats_emitter",
+                "emit.heartbeats.enabled", "false",
+                "central->local_one.enabled", "true",
+                "central->local_two.enabled", "true",
+                "beats_emitter->central.emit.heartbeats.enabled", "true"));
+
+        assertEquals("clusterPairs count should match (x->y.enabled=true or x->y.emit.heartbeats.enabled=true) count",
+                3, mirrorConfig.clusterPairs().size());
+    }
+
     public static class FakeConfigProvider implements ConfigProvider {
 
         Map<String, String> secrets = Collections.singletonMap("password", "secret2");