You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/05/07 19:17:18 UTC

[GitHub] [kafka] ryannedolan opened a new pull request #10652: WIP KAFKA-9726 LegacyReplicationPolicy (Ryanne's version)

ryannedolan opened a new pull request #10652:
URL: https://github.com/apache/kafka/pull/10652


   N.B. there are a few impls of this floating around.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mdedetrich commented on pull request #10652: KAFKA-9726 IdentityReplicationPolicy

Posted by GitBox <gi...@apache.org>.
mdedetrich commented on pull request #10652:
URL: https://github.com/apache/kafka/pull/10652#issuecomment-869693584


   Is there anything left on this PR to be merged (apart from the changelog which is a nice to have)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #10652: KAFKA-9726 IdentityReplicationPolicy

Posted by GitBox <gi...@apache.org>.
mimaison commented on pull request #10652:
URL: https://github.com/apache/kafka/pull/10652#issuecomment-871603484


   @ryannedolan I was about to merge but I noticed `IdentityReplicationIntegrationTest.testOffsetSyncsTopicsOnTarget()` is failing. It probably needs some adjustements to work with this policy as the computed offset-syncs topic name looks odd:
   ```
   org.opentest4j.AssertionFailedError: Condition not met within timeout 30000. Unable to find checkpoints for primarytest-topic-1 ==> expected: <true> but was: <false>
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mdedetrich commented on pull request #10652: KAFKA-9726 IdentityReplicationPolicy

Posted by GitBox <gi...@apache.org>.
mdedetrich commented on pull request #10652:
URL: https://github.com/apache/kafka/pull/10652#issuecomment-853663746


   @mimaison As my PR at https://github.com/apache/kafka/pull/10648 has been superseded by this one, would it be possible to do a quick review of it? Afaik its ready and @ryannedolan has done great work in avoiding the need for a KIP so there shouldn't be any blockers.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mdedetrich edited a comment on pull request #10652: KAFKA-9726 IdentityReplicationPolicy

Posted by GitBox <gi...@apache.org>.
mdedetrich edited a comment on pull request #10652:
URL: https://github.com/apache/kafka/pull/10652#issuecomment-869693584


   Is there anything left on this PR to be merged (apart from the changelog which is a nice to have)?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on a change in pull request #10652: KAFKA-9726 IdentityReplicationPolicy

Posted by GitBox <gi...@apache.org>.
mimaison commented on a change in pull request #10652:
URL: https://github.com/apache/kafka/pull/10652#discussion_r649253273



##########
File path: connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/MirrorClientTest.java
##########
@@ -159,4 +191,12 @@ public void remoteTopicsSeparatorTest() throws InterruptedException {
         assertTrue(remoteTopics.contains("source3__source4__source5__topic6"));
     }
 
+    public void testIdentityReplicationTopicSource() {

Review comment:
       Missing `@Test` annotation

##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
##########
@@ -492,7 +492,12 @@ boolean isCycle(String topic) {
         } else if (source.equals(sourceAndTarget.target())) {
             return true;
         } else {
-            return isCycle(replicationPolicy.upstreamTopic(topic));
+            String upstreamTopic = replicationPolicy.upstreamTopic(topic);
+            if (upstreamTopic.equals(topic)) {

Review comment:
       Can we cover this new branch with a test in `MirrorSourceConnectorTest`?

##########
File path: connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/MirrorClientTest.java
##########
@@ -159,4 +191,12 @@ public void remoteTopicsSeparatorTest() throws InterruptedException {
         assertTrue(remoteTopics.contains("source3__source4__source5__topic6"));
     }
 
+    public void testIdentityReplicationTopicSource() {
+        MirrorClient client = new FakeMirrorClient(
+            new IdentityReplicationPolicy("primary"), Arrays.asList());
+        assertEquals("topic1", client.replicationPolicy()
+            .formatRemoteTopic("primary", "topic1"));

Review comment:
       Should we also try `formatRemoteTopic()` with a heartbeat topic?

##########
File path: connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java
##########
@@ -60,7 +60,7 @@
     private ReplicationPolicy replicationPolicy;
     private Map<String, Object> consumerConfig;
 
-    public MirrorClient(Map<String, Object> props) {
+    public MirrorClient(Map<String, ?> props) {

Review comment:
       Is this actually needed?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #10652: KAFKA-9726 IdentityReplicationPolicy

Posted by GitBox <gi...@apache.org>.
mimaison commented on pull request #10652:
URL: https://github.com/apache/kafka/pull/10652#issuecomment-869893715


   Let's put a line in the changelog now so we're sure it's included in the release notes. I'm happy to merge once this is done.
   
   I agree the documentation can come later.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ryannedolan commented on a change in pull request #10652: WIP KAFKA-9726 LegacyReplicationPolicy (Ryanne's version)

Posted by GitBox <gi...@apache.org>.
ryannedolan commented on a change in pull request #10652:
URL: https://github.com/apache/kafka/pull/10652#discussion_r628457818



##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
##########
@@ -171,14 +177,22 @@ public void startClusters() throws Exception {
         waitForTopicCreated(primary, "mm2-status.backup.internal");
         waitForTopicCreated(primary, "mm2-offsets.backup.internal");
         waitForTopicCreated(primary, "mm2-configs.backup.internal");
-        
+ 
+        waitForTopicCreated(backup, "mm2-status.primary.internal");
+        waitForTopicCreated(backup, "mm2-offsets.primary.internal");
+        waitForTopicCreated(backup, "mm2-configs.primary.internal");
+       
         backup.start();

Review comment:
       whoops, bug here!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ryannedolan commented on a change in pull request #10652: KAFKA-9726 IdentityReplicationPolicy

Posted by GitBox <gi...@apache.org>.
ryannedolan commented on a change in pull request #10652:
URL: https://github.com/apache/kafka/pull/10652#discussion_r661007855



##########
File path: docs/upgrade.html
##########
@@ -80,7 +80,13 @@ <h5><a id="upgrade_300_notable" href="#upgrade_300_notable">Notable changes in 3
         understood by brokers or version 2.5 or higher, so you must upgrade your kafka cluster to get the stronger semantics. Otherwise, you can just pass
         in <code>new ConsumerGroupMetadata(consumerGroupId)</code> to work with older brokers. See <a href="https://cwiki.apache.org/confluence/x/zJONCg">KIP-732</a> for more details.
     </li>
-
+    <li> The Connect-based MirrorMaker (MM2) includes changes to support <code>IdentityReplicationPolicy</code>, enabling replication without renaming topics.

Review comment:
       @mimaison @mdedetrich wdyt?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mdedetrich commented on pull request #10652: KAFKA-9726 IdentityReplicationPolicy

Posted by GitBox <gi...@apache.org>.
mdedetrich commented on pull request #10652:
URL: https://github.com/apache/kafka/pull/10652#issuecomment-853663746


   @mimaison As my PR at https://github.com/apache/kafka/pull/10648 has been superseded by this one, would it be possible to do a quick review of it? Afaik its ready and @ryannedolan has done great work in avoiding the need for a KIP so there shouldn't be any blockers.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ryannedolan commented on pull request #10652: KAFKA-9726 IdentityReplicationPolicy

Posted by GitBox <gi...@apache.org>.
ryannedolan commented on pull request #10652:
URL: https://github.com/apache/kafka/pull/10652#issuecomment-854215843


   None of the failing tests are related. Ready to merge.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mdedetrich commented on a change in pull request #10652: KAFKA-9726 IdentityReplicationPolicy

Posted by GitBox <gi...@apache.org>.
mdedetrich commented on a change in pull request #10652:
URL: https://github.com/apache/kafka/pull/10652#discussion_r661207152



##########
File path: docs/upgrade.html
##########
@@ -80,7 +80,13 @@ <h5><a id="upgrade_300_notable" href="#upgrade_300_notable">Notable changes in 3
         understood by brokers or version 2.5 or higher, so you must upgrade your kafka cluster to get the stronger semantics. Otherwise, you can just pass
         in <code>new ConsumerGroupMetadata(consumerGroupId)</code> to work with older brokers. See <a href="https://cwiki.apache.org/confluence/x/zJONCg">KIP-732</a> for more details.
     </li>
-
+    <li> The Connect-based MirrorMaker (MM2) includes changes to support <code>IdentityReplicationPolicy</code>, enabling replication without renaming topics.

Review comment:
       I would make a quick amendment stating that it works like the original MM1 to make it more clear.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ryannedolan commented on pull request #10652: KAFKA-9726 IdentityReplicationPolicy

Posted by GitBox <gi...@apache.org>.
ryannedolan commented on pull request #10652:
URL: https://github.com/apache/kafka/pull/10652#issuecomment-854215843


   None of the failing tests are related. Ready to merge.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #10652: KAFKA-9726 IdentityReplicationPolicy

Posted by GitBox <gi...@apache.org>.
mimaison commented on pull request #10652:
URL: https://github.com/apache/kafka/pull/10652#issuecomment-869893715


   Let's put a line in the changelog now so we're sure it's included in the release notes. I'm happy to merge once this is done.
   
   I agree the documentation can come later.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ryannedolan commented on a change in pull request #10652: KAFKA-9726 IdentityReplicationPolicy

Posted by GitBox <gi...@apache.org>.
ryannedolan commented on a change in pull request #10652:
URL: https://github.com/apache/kafka/pull/10652#discussion_r655712378



##########
File path: connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/IdentityReplicationPolicy.java
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** IdentityReplicationPolicy does not rename remote topics. This is useful for migrating
+  * from legacy MM1, or for any use-case involving one-way replication.
+  *
+  * N.B. MirrorMaker is not able to prevent cycles when using this class, so take care that
+  * your replication topology is acyclic. If migrating from MirrorMaker v1, this will likely
+  * already be the case.
+  */
+public class IdentityReplicationPolicy extends DefaultReplicationPolicy {
+    private static final Logger log = LoggerFactory.getLogger(IdentityReplicationPolicy.class);
+
+    public static final String SOURCE_CLUSTER_ALIAS_CONFIG = "source.cluster.alias";
+
+    private String sourceClusterAlias = null;
+
+    public IdentityReplicationPolicy() {
+        //nop
+    }
+
+    // Visible for testing
+    IdentityReplicationPolicy(String sourceClusterAlias) {

Review comment:
       fixed, thanks




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] Justinwins commented on a change in pull request #10652: KAFKA-9726 IdentityReplicationPolicy

Posted by GitBox <gi...@apache.org>.
Justinwins commented on a change in pull request #10652:
URL: https://github.com/apache/kafka/pull/10652#discussion_r684042834



##########
File path: docs/upgrade.html
##########
@@ -83,7 +83,13 @@ <h5><a id="upgrade_300_notable" href="#upgrade_300_notable">Notable changes in 3
         understood by brokers or version 2.5 or higher, so you must upgrade your kafka cluster to get the stronger semantics. Otherwise, you can just pass
         in <code>new ConsumerGroupMetadata(consumerGroupId)</code> to work with older brokers. See <a href="https://cwiki.apache.org/confluence/x/zJONCg">KIP-732</a> for more details.
     </li>
-
+    <li> The Connect-based MirrorMaker (MM2) includes changes to support <code>IdentityReplicationPolicy</code>, enabling replication without renaming topics.
+        The existing <code>DefaultReplicationPolicy</code> is still used by default, but identity replication can be enabled via the
+        <code>replication.policy</code> configuration property. This is especially useful for users migrating from the older MirrorMaker (MM1), or for

Review comment:
       i think it's more clear to  say  "replication.policy.class "  here ,you know ,  which means it's configured in that form in mm2.properties  file.
   
   Friendly to beginners .
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] Justinwins commented on a change in pull request #10652: KAFKA-9726 IdentityReplicationPolicy

Posted by GitBox <gi...@apache.org>.
Justinwins commented on a change in pull request #10652:
URL: https://github.com/apache/kafka/pull/10652#discussion_r683330997



##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
##########
@@ -175,6 +175,7 @@ public MirrorClientConfig clientConfig(String cluster) {
         props.putAll(stringsWithPrefix("header.converter"));
         props.putAll(stringsWithPrefix("task"));
         props.putAll(stringsWithPrefix("worker"));
+        props.putAll(stringsWithPrefix("replication.policy"));

Review comment:
       In MirrorClientConfig ,it seems that it's not necessary to add replication.policy  into the props.
   As i see , org.apache.kafka.connect.mirror.MirrorClientConfig#replicationPolicy  initialise the ReplicationPolicy  instance,
   and  in  MirrorClient this instance is used ,where ReplicationPolicy   takes effect for real.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mdedetrich edited a comment on pull request #10652: KAFKA-9726 IdentityReplicationPolicy

Posted by GitBox <gi...@apache.org>.
mdedetrich edited a comment on pull request #10652:
URL: https://github.com/apache/kafka/pull/10652#issuecomment-869693584


   Is there anything left on this PR to be merged (apart from the changelog which is a nice to have)?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] Justinwins commented on a change in pull request #10652: KAFKA-9726 IdentityReplicationPolicy

Posted by GitBox <gi...@apache.org>.
Justinwins commented on a change in pull request #10652:
URL: https://github.com/apache/kafka/pull/10652#discussion_r684042834



##########
File path: docs/upgrade.html
##########
@@ -83,7 +83,13 @@ <h5><a id="upgrade_300_notable" href="#upgrade_300_notable">Notable changes in 3
         understood by brokers or version 2.5 or higher, so you must upgrade your kafka cluster to get the stronger semantics. Otherwise, you can just pass
         in <code>new ConsumerGroupMetadata(consumerGroupId)</code> to work with older brokers. See <a href="https://cwiki.apache.org/confluence/x/zJONCg">KIP-732</a> for more details.
     </li>
-
+    <li> The Connect-based MirrorMaker (MM2) includes changes to support <code>IdentityReplicationPolicy</code>, enabling replication without renaming topics.
+        The existing <code>DefaultReplicationPolicy</code> is still used by default, but identity replication can be enabled via the
+        <code>replication.policy</code> configuration property. This is especially useful for users migrating from the older MirrorMaker (MM1), or for

Review comment:
       i think it's more clear to  say  "replication.policy.class "  here ,you know ,  which means it's configured in that form in mm2.properties  file.
   
   Friendly to beginners .
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mdedetrich commented on pull request #10652: KAFKA-9726 IdentityReplicationPolicy

Posted by GitBox <gi...@apache.org>.
mdedetrich commented on pull request #10652:
URL: https://github.com/apache/kafka/pull/10652#issuecomment-849508419


   @ryannedolan Since https://github.com/apache/kafka/pull/10762 was merged maybe it makes sense to rebase against the current `trunk`? Some of the tests in this PR have assert statements without the failure messages which have been just been fixed (I believe you can just copy some of those assert failure messages to make sure its consistent).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mdedetrich commented on pull request #10652: KAFKA-9726 IdentityReplicationPolicy

Posted by GitBox <gi...@apache.org>.
mdedetrich commented on pull request #10652:
URL: https://github.com/apache/kafka/pull/10652#issuecomment-869693584


   Is there anything left on this PR to be merged (apart from the changelog which is a nice to have)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #10652: KAFKA-9726 IdentityReplicationPolicy

Posted by GitBox <gi...@apache.org>.
mimaison commented on pull request #10652:
URL: https://github.com/apache/kafka/pull/10652#issuecomment-872037198


   Thanks @ryannedolan, @mdedetrich and @ivanyu 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ryannedolan commented on a change in pull request #10652: KAFKA-9726 IdentityReplicationPolicy

Posted by GitBox <gi...@apache.org>.
ryannedolan commented on a change in pull request #10652:
URL: https://github.com/apache/kafka/pull/10652#discussion_r655712378



##########
File path: connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/IdentityReplicationPolicy.java
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** IdentityReplicationPolicy does not rename remote topics. This is useful for migrating
+  * from legacy MM1, or for any use-case involving one-way replication.
+  *
+  * N.B. MirrorMaker is not able to prevent cycles when using this class, so take care that
+  * your replication topology is acyclic. If migrating from MirrorMaker v1, this will likely
+  * already be the case.
+  */
+public class IdentityReplicationPolicy extends DefaultReplicationPolicy {
+    private static final Logger log = LoggerFactory.getLogger(IdentityReplicationPolicy.class);
+
+    public static final String SOURCE_CLUSTER_ALIAS_CONFIG = "source.cluster.alias";
+
+    private String sourceClusterAlias = null;
+
+    public IdentityReplicationPolicy() {
+        //nop
+    }
+
+    // Visible for testing
+    IdentityReplicationPolicy(String sourceClusterAlias) {

Review comment:
       fixed, thanks




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ryannedolan commented on a change in pull request #10652: WIP KAFKA-9726 LegacyReplicationPolicy (Ryanne's version)

Posted by GitBox <gi...@apache.org>.
ryannedolan commented on a change in pull request #10652:
URL: https://github.com/apache/kafka/pull/10652#discussion_r637219748



##########
File path: connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java
##########
@@ -60,7 +60,7 @@
     private ReplicationPolicy replicationPolicy;
     private Map<String, Object> consumerConfig;
 
-    public MirrorClient(Map<String, Object> props) {
+    public MirrorClient(Map<String, ?> props) {

Review comment:
       This is an incidental API change, but I don't _think_ it is a breaking change.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #10652: KAFKA-9726 IdentityReplicationPolicy

Posted by GitBox <gi...@apache.org>.
mimaison commented on pull request #10652:
URL: https://github.com/apache/kafka/pull/10652#issuecomment-855723038


   @ryannedolan @mdedetrich Thanks, I'll try to take a look this week


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ryannedolan commented on pull request #10652: KAFKA-9726 IdentityReplicationPolicy

Posted by GitBox <gi...@apache.org>.
ryannedolan commented on pull request #10652:
URL: https://github.com/apache/kafka/pull/10652#issuecomment-869824087


   I think changelog and documentation updates can come after this is merged.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] Justinwins commented on a change in pull request #10652: KAFKA-9726 IdentityReplicationPolicy

Posted by GitBox <gi...@apache.org>.
Justinwins commented on a change in pull request #10652:
URL: https://github.com/apache/kafka/pull/10652#discussion_r683330997



##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
##########
@@ -175,6 +175,7 @@ public MirrorClientConfig clientConfig(String cluster) {
         props.putAll(stringsWithPrefix("header.converter"));
         props.putAll(stringsWithPrefix("task"));
         props.putAll(stringsWithPrefix("worker"));
+        props.putAll(stringsWithPrefix("replication.policy"));

Review comment:
       In MirrorClientConfig ,it seems that it's not necessary to add replication.policy  into the props.
   As i see , org.apache.kafka.connect.mirror.MirrorClientConfig#replicationPolicy  initialise the ReplicationPolicy  instance,
   and  in  MirrorClient this instance is used ,where ReplicationPolicy   takes effect for real.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison merged pull request #10652: KAFKA-9726 IdentityReplicationPolicy

Posted by GitBox <gi...@apache.org>.
mimaison merged pull request #10652:
URL: https://github.com/apache/kafka/pull/10652


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mdedetrich commented on pull request #10652: KAFKA-9726 IdentityReplicationPolicy

Posted by GitBox <gi...@apache.org>.
mdedetrich commented on pull request #10652:
URL: https://github.com/apache/kafka/pull/10652#issuecomment-867460303


   @ryannedolan I actually just realized, does it make sense to add the `IdentityReplicationPolicy` to the release notes so that people are aware of it (talking about `docs/streams/upgrade-guide.html`)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ryannedolan commented on pull request #10652: KAFKA-9726 IdentityReplicationPolicy

Posted by GitBox <gi...@apache.org>.
ryannedolan commented on pull request #10652:
URL: https://github.com/apache/kafka/pull/10652#issuecomment-871707049


   > testOffsetSyncsTopicsOnTarget
   
   Yep good catch. Fixed!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ryannedolan commented on pull request #10652: KAFKA-9726 IdentityReplicationPolicy

Posted by GitBox <gi...@apache.org>.
ryannedolan commented on pull request #10652:
URL: https://github.com/apache/kafka/pull/10652#issuecomment-869824087


   I think changelog and documentation updates can come after this is merged.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on a change in pull request #10652: KAFKA-9726 IdentityReplicationPolicy

Posted by GitBox <gi...@apache.org>.
mimaison commented on a change in pull request #10652:
URL: https://github.com/apache/kafka/pull/10652#discussion_r661687026



##########
File path: docs/upgrade.html
##########
@@ -80,7 +80,13 @@ <h5><a id="upgrade_300_notable" href="#upgrade_300_notable">Notable changes in 3
         understood by brokers or version 2.5 or higher, so you must upgrade your kafka cluster to get the stronger semantics. Otherwise, you can just pass
         in <code>new ConsumerGroupMetadata(consumerGroupId)</code> to work with older brokers. See <a href="https://cwiki.apache.org/confluence/x/zJONCg">KIP-732</a> for more details.
     </li>
-
+    <li> The Connect-based MirrorMaker (MM2) includes changes to support <code>IdentityReplicationPolicy</code>, enabling replication without renaming topics.

Review comment:
       Yes I think it's enough, thanks




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #10652: KAFKA-9726 IdentityReplicationPolicy

Posted by GitBox <gi...@apache.org>.
mimaison commented on pull request #10652:
URL: https://github.com/apache/kafka/pull/10652#issuecomment-867497775


   Yes, we can add a line in https://github.com/apache/kafka/blob/trunk/docs/upgrade.html#L85 to introduce this new ReplicationPolicy


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org