You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/09/21 14:59:45 UTC

[GitHub] [kafka] scanterog opened a new pull request #9313: [mm2] Fix consumer/producer/admin properties override

scanterog opened a new pull request #9313:
URL: https://github.com/apache/kafka/pull/9313


   Currently the producer/consumer properties override for the MirrorSourceTask and OffsetSyncStore do not work. This is due the props stored into MirrorConnectorConfig have a `target.cluster` or `source.cluster` prefix. For example, the [sourceConsumerConfig](https://github.com/apache/kafka/blob/aa0cd667bcd5c4025e84097192030b59165ac9d0/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java#L240) will strip this prefix and store all props as:
   
   ```
   producer.bootstrap.servers -> localhost:9092
   consumer.auto.offset.reset -> latest
   alias -> A
   bootstrap.servers -> localhost:9092
   admin.bootstrap.servers -> localhost:9092
   consumer.bootstrap.servers -> localhost:9092
   ```
   
   The next line `props.keySet().retainAll(MirrorClientConfig.CLIENT_CONFIG_DEF.names());` will strip all the props not defined in this common CLIENT_CONFIG_DEF definition. Not relevant.
   
   Finally, the `props.putAll(originalsWithPrefix(CONSUMER_CLIENT_PREFIX));` is based on the on the `originals` variable that have the `target.cluster` or `source.cluster` prefix. There's no single property with the "consumer." prefix. This patterns repeats with the producer/admin config.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] scanterog commented on a change in pull request #9313: [mm2] Fix consumer/producer properties override

Posted by GitBox <gi...@apache.org>.
scanterog commented on a change in pull request #9313:
URL: https://github.com/apache/kafka/pull/9313#discussion_r495871017



##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
##########
@@ -199,8 +199,8 @@
 
     protected static final String SOURCE_CLUSTER_PREFIX = MirrorMakerConfig.SOURCE_CLUSTER_PREFIX;
     protected static final String TARGET_CLUSTER_PREFIX = MirrorMakerConfig.TARGET_CLUSTER_PREFIX;
-    protected static final String PRODUCER_CLIENT_PREFIX = "producer.";
-    protected static final String CONSUMER_CLIENT_PREFIX = "consumer.";
+    protected static final String PRODUCER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "producer.";
+    protected static final String CONSUMER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "consumer.";

Review comment:
       I'm not sure if I follow it. ~This prefix "source_cluster" is not user specified. It is prefixed somewhere else by mm2 (I can get the lines if you want later on).~ The config specified [here](https://github.com/apache/kafka/tree/trunk/connect/mirror#producer--consumer--admin-config-used-by-mm2) will actually be honored. It does not imply changes on the MirrorMaker config side at least. Are you talking about the case of running MirrorMaker in a connect cluster rather than as a dedicated cluster? I didn't check that one TBH. Maybe @ryannedolan has more insight on this one?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on a change in pull request #9313: [mm2] Fix consumer/producer properties override

Posted by GitBox <gi...@apache.org>.
mimaison commented on a change in pull request #9313:
URL: https://github.com/apache/kafka/pull/9313#discussion_r502264321



##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
##########
@@ -52,10 +52,10 @@ public void testClusterConfigProperties() {
             "replication.factor", "4"));
         Map<String, String> connectorProps = mirrorConfig.connectorBaseConfig(new SourceAndTarget("a", "b"),
             MirrorSourceConnector.class);
-        assertEquals("source.cluster.bootstrap.servers is set",
-            "servers-one", connectorProps.get("source.cluster.bootstrap.servers"));
-        assertEquals("target.cluster.bootstrap.servers is set",
-            "servers-two", connectorProps.get("target.cluster.bootstrap.servers"));
+        assertEquals("source.bootstrap.servers is set",

Review comment:
       I was not expecting to see changes in existing tests if we are not breaking compatibility, `source.cluster.bootstrap.servers` should continue working and still be tested




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #9313: [mm2] Fix consumer/producer properties override

Posted by GitBox <gi...@apache.org>.
mimaison commented on pull request #9313:
URL: https://github.com/apache/kafka/pull/9313#issuecomment-705563795


   @scanterog 
   `testClusterConfigProperties()` seems to be failing now. Can you fix it before I take a look at the PR?
   
   ```
   org.apache.kafka.connect.mirror.MirrorMakerConfigTest > testClusterConfigProperties FAILED
       java.lang.AssertionError: source.cluster.bootstrap.servers is set expected:<servers-one> but was:<null>
           at org.junit.Assert.fail(Assert.java:89)
           at org.junit.Assert.failNotEquals(Assert.java:835)
           at org.junit.Assert.assertEquals(Assert.java:120)
           at org.apache.kafka.connect.mirror.MirrorMakerConfigTest.testClusterConfigProperties(MirrorMakerConfigTest.java:55)
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on a change in pull request #9313: [mm2] Fix consumer/producer properties override

Posted by GitBox <gi...@apache.org>.
mimaison commented on a change in pull request #9313:
URL: https://github.com/apache/kafka/pull/9313#discussion_r498290122



##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
##########
@@ -199,8 +199,8 @@
 
     protected static final String SOURCE_CLUSTER_PREFIX = MirrorMakerConfig.SOURCE_CLUSTER_PREFIX;
     protected static final String TARGET_CLUSTER_PREFIX = MirrorMakerConfig.TARGET_CLUSTER_PREFIX;
-    protected static final String PRODUCER_CLIENT_PREFIX = "producer.";
-    protected static final String CONSUMER_CLIENT_PREFIX = "consumer.";
+    protected static final String PRODUCER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "producer.";
+    protected static final String CONSUMER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "consumer.";

Review comment:
       It's unfortunate for admin we use `source.admin` as the prefix ...
   
   So we'd be left with a configuration like:
   ```
   "source.cluster.bootstrap.servers": "localhost:9092",
   "source.cluster.security.protocol": "SASL_SSL",
   "source.cluster.producer.some-producer-setting": 123
   "source.cluster.consumer.some-consumer-setting": 123
   "source.admin.some-admin-setting": 123
   ```
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] scanterog commented on pull request #9313: [mm2] Fix consumer/producer properties override

Posted by GitBox <gi...@apache.org>.
scanterog commented on pull request #9313:
URL: https://github.com/apache/kafka/pull/9313#issuecomment-710022885


   > @scanterog I've rekicked the build
   
   I think this is ready to merge :P


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on a change in pull request #9313: [mm2] Fix consumer/producer properties override

Posted by GitBox <gi...@apache.org>.
mimaison commented on a change in pull request #9313:
URL: https://github.com/apache/kafka/pull/9313#discussion_r498290122



##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
##########
@@ -199,8 +199,8 @@
 
     protected static final String SOURCE_CLUSTER_PREFIX = MirrorMakerConfig.SOURCE_CLUSTER_PREFIX;
     protected static final String TARGET_CLUSTER_PREFIX = MirrorMakerConfig.TARGET_CLUSTER_PREFIX;
-    protected static final String PRODUCER_CLIENT_PREFIX = "producer.";
-    protected static final String CONSUMER_CLIENT_PREFIX = "consumer.";
+    protected static final String PRODUCER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "producer.";
+    protected static final String CONSUMER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "consumer.";

Review comment:
       It's unfortunate for admin we use `source.admin` as the prefix ...
   
   So we'd be left with a configuration like:
   ```
   "source.cluster.bootstrap.servers": "localhost:9092",
   "source.cluster.security.protocol": "SASL_SSL",
   "source.cluster.producer.some-producer-setting": 123,
   "source.cluster.consumer.some-consumer-setting": 123,
   "source.admin.some-admin-setting": 123
   ```
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] scanterog commented on a change in pull request #9313: [mm2] Fix consumer/producer properties override

Posted by GitBox <gi...@apache.org>.
scanterog commented on a change in pull request #9313:
URL: https://github.com/apache/kafka/pull/9313#discussion_r500566839



##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
##########
@@ -199,8 +199,8 @@
 
     protected static final String SOURCE_CLUSTER_PREFIX = MirrorMakerConfig.SOURCE_CLUSTER_PREFIX;
     protected static final String TARGET_CLUSTER_PREFIX = MirrorMakerConfig.TARGET_CLUSTER_PREFIX;
-    protected static final String PRODUCER_CLIENT_PREFIX = "producer.";
-    protected static final String CONSUMER_CLIENT_PREFIX = "consumer.";
+    protected static final String PRODUCER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "producer.";
+    protected static final String CONSUMER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "consumer.";

Review comment:
       For users running on a Connect cluster, that format is not supported right now:
   ```
   source.producer.some-producer-setting: 123
   ```
   
   The only supported is (unless I'm missing something):
   ```
   "producer.some-producer-setting": 123
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] scanterog commented on a change in pull request #9313: [mm2] Fix consumer/producer properties override

Posted by GitBox <gi...@apache.org>.
scanterog commented on a change in pull request #9313:
URL: https://github.com/apache/kafka/pull/9313#discussion_r501326680



##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
##########
@@ -199,8 +199,8 @@
 
     protected static final String SOURCE_CLUSTER_PREFIX = MirrorMakerConfig.SOURCE_CLUSTER_PREFIX;
     protected static final String TARGET_CLUSTER_PREFIX = MirrorMakerConfig.TARGET_CLUSTER_PREFIX;
-    protected static final String PRODUCER_CLIENT_PREFIX = "producer.";
-    protected static final String CONSUMER_CLIENT_PREFIX = "consumer.";
+    protected static final String PRODUCER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "producer.";
+    protected static final String CONSUMER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "consumer.";

Review comment:
       I think the new code does what we want. Also tested a bin from this on my infra and it is working fine. Let me know if this works for you.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] scanterog commented on a change in pull request #9313: [mm2] Fix consumer/producer properties override

Posted by GitBox <gi...@apache.org>.
scanterog commented on a change in pull request #9313:
URL: https://github.com/apache/kafka/pull/9313#discussion_r497079320



##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
##########
@@ -199,8 +199,8 @@
 
     protected static final String SOURCE_CLUSTER_PREFIX = MirrorMakerConfig.SOURCE_CLUSTER_PREFIX;
     protected static final String TARGET_CLUSTER_PREFIX = MirrorMakerConfig.TARGET_CLUSTER_PREFIX;
-    protected static final String PRODUCER_CLIENT_PREFIX = "producer.";
-    protected static final String CONSUMER_CLIENT_PREFIX = "consumer.";
+    protected static final String PRODUCER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "producer.";
+    protected static final String CONSUMER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "consumer.";

Review comment:
       I have put a new fix that should handle both cases. Details on why I think this is a good fix:
   * There's no change on was is defined [here](https://github.com/apache/kafka/tree/trunk/connect/mirror#producer--consumer--admin-config-used-by-mm2). The properties that starts with `producer.` or `consumer.` are still ignored in mm2 dedicated mode.
   * MM2 needs to differentiate the properties for the source cluster and we can't just remove the `source.cluster` prefix.
   * For running the mirror connectors on a Connect cluster, the only "drawback" is that now you can provide an override with `source.cluster.consumer` or `source.cluster.producer`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] scanterog commented on pull request #9313: [mm2] Fix consumer/producer/admin properties override

Posted by GitBox <gi...@apache.org>.
scanterog commented on pull request #9313:
URL: https://github.com/apache/kafka/pull/9313#issuecomment-696271853


   Thanks @ryannedolan! what is the usual procedure to get this merged?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] scanterog commented on a change in pull request #9313: [mm2] Fix consumer/producer properties override

Posted by GitBox <gi...@apache.org>.
scanterog commented on a change in pull request #9313:
URL: https://github.com/apache/kafka/pull/9313#discussion_r497079320



##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
##########
@@ -199,8 +199,8 @@
 
     protected static final String SOURCE_CLUSTER_PREFIX = MirrorMakerConfig.SOURCE_CLUSTER_PREFIX;
     protected static final String TARGET_CLUSTER_PREFIX = MirrorMakerConfig.TARGET_CLUSTER_PREFIX;
-    protected static final String PRODUCER_CLIENT_PREFIX = "producer.";
-    protected static final String CONSUMER_CLIENT_PREFIX = "consumer.";
+    protected static final String PRODUCER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "producer.";
+    protected static final String CONSUMER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "consumer.";

Review comment:
       I have put a new fix that should handle both cases. Details on why I think this is a reasonable fix:
   * There's no change on what is defined [here](https://github.com/apache/kafka/tree/trunk/connect/mirror#producer--consumer--admin-config-used-by-mm2). The properties that starts with `producer.` or `consumer.` are still ignored in mm2 dedicated mode.
   * MM2 needs to differentiate the properties for the source cluster and we can't just remove the `source.cluster` prefix.
   * For running the mirror connectors on a Connect cluster, the only "drawback" is that now you can provide an override with `source.cluster.consumer` or `source.cluster.producer`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] scanterog commented on a change in pull request #9313: [mm2] Fix consumer/producer properties override

Posted by GitBox <gi...@apache.org>.
scanterog commented on a change in pull request #9313:
URL: https://github.com/apache/kafka/pull/9313#discussion_r500569567



##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
##########
@@ -199,8 +199,8 @@
 
     protected static final String SOURCE_CLUSTER_PREFIX = MirrorMakerConfig.SOURCE_CLUSTER_PREFIX;
     protected static final String TARGET_CLUSTER_PREFIX = MirrorMakerConfig.TARGET_CLUSTER_PREFIX;
-    protected static final String PRODUCER_CLIENT_PREFIX = "producer.";
-    protected static final String CONSUMER_CLIENT_PREFIX = "consumer.";
+    protected static final String PRODUCER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "producer.";
+    protected static final String CONSUMER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "consumer.";

Review comment:
       🤦 got it. I misunderstood it. Let me give it a try.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] scanterog commented on pull request #9313: [mm2] Fix consumer/producer properties override

Posted by GitBox <gi...@apache.org>.
scanterog commented on pull request #9313:
URL: https://github.com/apache/kafka/pull/9313#issuecomment-698983392


   @mimaison just added it. Let me know if there's anything else to fix. Thanks!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison merged pull request #9313: [mm2] Fix consumer/producer properties override

Posted by GitBox <gi...@apache.org>.
mimaison merged pull request #9313:
URL: https://github.com/apache/kafka/pull/9313


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] scanterog commented on a change in pull request #9313: [mm2] Fix consumer/producer properties override

Posted by GitBox <gi...@apache.org>.
scanterog commented on a change in pull request #9313:
URL: https://github.com/apache/kafka/pull/9313#discussion_r495995224



##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
##########
@@ -199,8 +199,8 @@
 
     protected static final String SOURCE_CLUSTER_PREFIX = MirrorMakerConfig.SOURCE_CLUSTER_PREFIX;
     protected static final String TARGET_CLUSTER_PREFIX = MirrorMakerConfig.TARGET_CLUSTER_PREFIX;
-    protected static final String PRODUCER_CLIENT_PREFIX = "producer.";
-    protected static final String CONSUMER_CLIENT_PREFIX = "consumer.";
+    protected static final String PRODUCER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "producer.";
+    protected static final String CONSUMER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "consumer.";

Review comment:
       Gotcha. I just peek'd at it again and you're right. It seems the proper fix would be to fix the way mm2 populates the MirrorConnectorConfig to avoid this change. WDYT @ryannedolan ? Not sure how involved this change would be.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #9313: [mm2] Fix consumer/producer properties override

Posted by GitBox <gi...@apache.org>.
mimaison commented on pull request #9313:
URL: https://github.com/apache/kafka/pull/9313#issuecomment-709415966


   @scanterog I've rekicked the build


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] scanterog commented on a change in pull request #9313: [mm2] Fix consumer/producer properties override

Posted by GitBox <gi...@apache.org>.
scanterog commented on a change in pull request #9313:
URL: https://github.com/apache/kafka/pull/9313#discussion_r502351930



##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
##########
@@ -52,10 +52,10 @@ public void testClusterConfigProperties() {
             "replication.factor", "4"));
         Map<String, String> connectorProps = mirrorConfig.connectorBaseConfig(new SourceAndTarget("a", "b"),
             MirrorSourceConnector.class);
-        assertEquals("source.cluster.bootstrap.servers is set",
-            "servers-one", connectorProps.get("source.cluster.bootstrap.servers"));
-        assertEquals("target.cluster.bootstrap.servers is set",
-            "servers-two", connectorProps.get("target.cluster.bootstrap.servers"));
+        assertEquals("source.bootstrap.servers is set",

Review comment:
       That will require more changes. This does not break for the mirrorMaker driver main config. I guess it does for on Connect though 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] scanterog commented on a change in pull request #9313: [mm2] Fix consumer/producer properties override

Posted by GitBox <gi...@apache.org>.
scanterog commented on a change in pull request #9313:
URL: https://github.com/apache/kafka/pull/9313#discussion_r497079320



##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
##########
@@ -199,8 +199,8 @@
 
     protected static final String SOURCE_CLUSTER_PREFIX = MirrorMakerConfig.SOURCE_CLUSTER_PREFIX;
     protected static final String TARGET_CLUSTER_PREFIX = MirrorMakerConfig.TARGET_CLUSTER_PREFIX;
-    protected static final String PRODUCER_CLIENT_PREFIX = "producer.";
-    protected static final String CONSUMER_CLIENT_PREFIX = "consumer.";
+    protected static final String PRODUCER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "producer.";
+    protected static final String CONSUMER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "consumer.";

Review comment:
       I have put a new fix that should handle both cases.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] scanterog commented on a change in pull request #9313: [mm2] Fix consumer/producer properties override

Posted by GitBox <gi...@apache.org>.
scanterog commented on a change in pull request #9313:
URL: https://github.com/apache/kafka/pull/9313#discussion_r500464804



##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
##########
@@ -199,8 +199,8 @@
 
     protected static final String SOURCE_CLUSTER_PREFIX = MirrorMakerConfig.SOURCE_CLUSTER_PREFIX;
     protected static final String TARGET_CLUSTER_PREFIX = MirrorMakerConfig.TARGET_CLUSTER_PREFIX;
-    protected static final String PRODUCER_CLIENT_PREFIX = "producer.";
-    protected static final String CONSUMER_CLIENT_PREFIX = "consumer.";
+    protected static final String PRODUCER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "producer.";
+    protected static final String CONSUMER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "consumer.";

Review comment:
       @mimaison just a friendly ping.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] scanterog edited a comment on pull request #9313: [mm2] Fix consumer/producer properties override

Posted by GitBox <gi...@apache.org>.
scanterog edited a comment on pull request #9313:
URL: https://github.com/apache/kafka/pull/9313#issuecomment-709412233


   > Thanks @scanterog, the changes look good now. I've left a couple of very minor comments that would be nice to address. I'll be happy to merge then
   
   Thanks. There's one test failing for JDK15 apparently (need to check if it is legit). Works locally with JDK11. I wish I could rerun those tests. Many flakeys ones.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] scanterog commented on pull request #9313: [mm2] Fix consumer/producer/admin properties override

Posted by GitBox <gi...@apache.org>.
scanterog commented on pull request #9313:
URL: https://github.com/apache/kafka/pull/9313#issuecomment-696259251


   Hello @ryannedolan @mimaison. Your review will be appreciated. I think the test failure is unrelated. Thanks!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ryannedolan commented on pull request #9313: [mm2] Fix consumer/producer/admin properties override

Posted by GitBox <gi...@apache.org>.
ryannedolan commented on pull request #9313:
URL: https://github.com/apache/kafka/pull/9313#issuecomment-696275095


   > Thanks @ryannedolan! what is the usual procedure to get this merged?
   
   A committer must approve and merge.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] scanterog commented on a change in pull request #9313: [mm2] Fix consumer/producer properties override

Posted by GitBox <gi...@apache.org>.
scanterog commented on a change in pull request #9313:
URL: https://github.com/apache/kafka/pull/9313#discussion_r498299493



##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
##########
@@ -199,8 +199,8 @@
 
     protected static final String SOURCE_CLUSTER_PREFIX = MirrorMakerConfig.SOURCE_CLUSTER_PREFIX;
     protected static final String TARGET_CLUSTER_PREFIX = MirrorMakerConfig.TARGET_CLUSTER_PREFIX;
-    protected static final String PRODUCER_CLIENT_PREFIX = "producer.";
-    protected static final String CONSUMER_CLIENT_PREFIX = "consumer.";
+    protected static final String PRODUCER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "producer.";
+    protected static final String CONSUMER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "consumer.";

Review comment:
       For connect, you can still use (or the one you've shared).
   ```
   "source.cluster.bootstrap.servers": "localhost:9092",
   "source.cluster.security.protocol": "SASL_SSL",
   "producer.some-producer-setting": 123
   "consumer.some-consumer-setting": 123
   "source.admin.some-admin-setting": 123
   ```
   
   Yes, we could normalize that but I'm not touching admin config in this PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] scanterog commented on a change in pull request #9313: [mm2] Fix consumer/producer properties override

Posted by GitBox <gi...@apache.org>.
scanterog commented on a change in pull request #9313:
URL: https://github.com/apache/kafka/pull/9313#discussion_r505512942



##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
##########
@@ -245,10 +254,17 @@ public MirrorClientConfig clientConfig(String cluster) {
         Map<String, String> strings = originalsStrings();
         strings.keySet().removeIf(x -> !x.startsWith(prefix));
         return strings;
-    } 
+    }
+
+    static Map<String, String> prefixForGeneralAttrs(String prefix, Map<String, String> props) {

Review comment:
       Indeed. Thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ryannedolan commented on pull request #9313: [mm2] Fix consumer/producer/admin properties override

Posted by GitBox <gi...@apache.org>.
ryannedolan commented on pull request #9313:
URL: https://github.com/apache/kafka/pull/9313#issuecomment-696275095


   > Thanks @ryannedolan! what is the usual procedure to get this merged?
   
   A committer must approve and merge.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #9313: [mm2] Fix consumer/producer properties override

Posted by GitBox <gi...@apache.org>.
mimaison commented on pull request #9313:
URL: https://github.com/apache/kafka/pull/9313#issuecomment-698229647


   @scanterog Thanks for the PR. Can you add a test for the changes?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #9313: [mm2] Fix consumer/producer properties override

Posted by GitBox <gi...@apache.org>.
mimaison commented on pull request #9313:
URL: https://github.com/apache/kafka/pull/9313#issuecomment-710160841


   Test failures are not related, merging to trunk


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on a change in pull request #9313: [mm2] Fix consumer/producer properties override

Posted by GitBox <gi...@apache.org>.
mimaison commented on a change in pull request #9313:
URL: https://github.com/apache/kafka/pull/9313#discussion_r502264321



##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
##########
@@ -52,10 +52,10 @@ public void testClusterConfigProperties() {
             "replication.factor", "4"));
         Map<String, String> connectorProps = mirrorConfig.connectorBaseConfig(new SourceAndTarget("a", "b"),
             MirrorSourceConnector.class);
-        assertEquals("source.cluster.bootstrap.servers is set",
-            "servers-one", connectorProps.get("source.cluster.bootstrap.servers"));
-        assertEquals("target.cluster.bootstrap.servers is set",
-            "servers-two", connectorProps.get("target.cluster.bootstrap.servers"));
+        assertEquals("source.bootstrap.servers is set",

Review comment:
       I was not expecting to see changes in existing tests if we are not breaking compatibility, `source.cluster.bootstrap.servers` should continue working and still be tested




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #9313: [mm2] Fix consumer/producer properties override

Posted by GitBox <gi...@apache.org>.
mimaison commented on pull request #9313:
URL: https://github.com/apache/kafka/pull/9313#issuecomment-698229647


   @scanterog Thanks for the PR. Can you add a test for the changes?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on a change in pull request #9313: [mm2] Fix consumer/producer properties override

Posted by GitBox <gi...@apache.org>.
mimaison commented on a change in pull request #9313:
URL: https://github.com/apache/kafka/pull/9313#discussion_r500562684



##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
##########
@@ -199,8 +199,8 @@
 
     protected static final String SOURCE_CLUSTER_PREFIX = MirrorMakerConfig.SOURCE_CLUSTER_PREFIX;
     protected static final String TARGET_CLUSTER_PREFIX = MirrorMakerConfig.TARGET_CLUSTER_PREFIX;
-    protected static final String PRODUCER_CLIENT_PREFIX = "producer.";
-    protected static final String CONSUMER_CLIENT_PREFIX = "consumer.";
+    protected static final String PRODUCER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "producer.";
+    protected static final String CONSUMER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "consumer.";

Review comment:
       I don't think the format mentioned in https://github.com/apache/kafka/pull/9313#discussion_r498298987 would break compatibility. 
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] scanterog commented on a change in pull request #9313: [mm2] Fix consumer/producer properties override

Posted by GitBox <gi...@apache.org>.
scanterog commented on a change in pull request #9313:
URL: https://github.com/apache/kafka/pull/9313#discussion_r495995224



##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
##########
@@ -199,8 +199,8 @@
 
     protected static final String SOURCE_CLUSTER_PREFIX = MirrorMakerConfig.SOURCE_CLUSTER_PREFIX;
     protected static final String TARGET_CLUSTER_PREFIX = MirrorMakerConfig.TARGET_CLUSTER_PREFIX;
-    protected static final String PRODUCER_CLIENT_PREFIX = "producer.";
-    protected static final String CONSUMER_CLIENT_PREFIX = "consumer.";
+    protected static final String PRODUCER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "producer.";
+    protected static final String CONSUMER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "consumer.";

Review comment:
       Gotcha. I just peek at it again and you're right. It seems the proper fix would be to fix the way mm2 populates the MirrorConnectorConfig to avoid this change. WDYT @ryannedolan ? Not sure how involved this change would be.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on a change in pull request #9313: [mm2] Fix consumer/producer properties override

Posted by GitBox <gi...@apache.org>.
mimaison commented on a change in pull request #9313:
URL: https://github.com/apache/kafka/pull/9313#discussion_r498291916



##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java
##########
@@ -119,4 +120,41 @@ public void testNonMutationOfConfigDef() {
             connectorConfigDef.names().contains(taskSpecificProperty)
         ));
     }
+
+    @Test
+    public void testSourceConsumerConfig() {

Review comment:
       We should test settings with the `source.cluster.` prefix too. Same in the test below




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] scanterog commented on a change in pull request #9313: [mm2] Fix consumer/producer properties override

Posted by GitBox <gi...@apache.org>.
scanterog commented on a change in pull request #9313:
URL: https://github.com/apache/kafka/pull/9313#discussion_r503489280



##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
##########
@@ -52,10 +52,10 @@ public void testClusterConfigProperties() {
             "replication.factor", "4"));
         Map<String, String> connectorProps = mirrorConfig.connectorBaseConfig(new SourceAndTarget("a", "b"),
             MirrorSourceConnector.class);
-        assertEquals("source.cluster.bootstrap.servers is set",
-            "servers-one", connectorProps.get("source.cluster.bootstrap.servers"));
-        assertEquals("target.cluster.bootstrap.servers is set",
-            "servers-two", connectorProps.get("target.cluster.bootstrap.servers"));
+        assertEquals("source.bootstrap.servers is set",

Review comment:
       I think I have achieved what we want. Explicitly setting "source." prefix for props starting with consumer|producer|admin and setting "source.cluster." otherwise. All tests passed. The code runs fine on my infra.
   
   @mimaison please take a look whenever you get some time :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] scanterog commented on a change in pull request #9313: [mm2] Fix consumer/producer properties override

Posted by GitBox <gi...@apache.org>.
scanterog commented on a change in pull request #9313:
URL: https://github.com/apache/kafka/pull/9313#discussion_r498971069



##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
##########
@@ -199,8 +199,8 @@
 
     protected static final String SOURCE_CLUSTER_PREFIX = MirrorMakerConfig.SOURCE_CLUSTER_PREFIX;
     protected static final String TARGET_CLUSTER_PREFIX = MirrorMakerConfig.TARGET_CLUSTER_PREFIX;
-    protected static final String PRODUCER_CLIENT_PREFIX = "producer.";
-    protected static final String CONSUMER_CLIENT_PREFIX = "consumer.";
+    protected static final String PRODUCER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "producer.";
+    protected static final String CONSUMER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "consumer.";

Review comment:
       > Maybe not including cluster for all the client specific settings would be slightly better, WDYT?
   
   Somehow I missed this. I do not have a strong opinion on that but I do like the idea on normalizing it. But changing that will still break existing users though. How do you want to proceed?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] scanterog commented on a change in pull request #9313: [mm2] Fix consumer/producer properties override

Posted by GitBox <gi...@apache.org>.
scanterog commented on a change in pull request #9313:
URL: https://github.com/apache/kafka/pull/9313#discussion_r497079320



##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
##########
@@ -199,8 +199,8 @@
 
     protected static final String SOURCE_CLUSTER_PREFIX = MirrorMakerConfig.SOURCE_CLUSTER_PREFIX;
     protected static final String TARGET_CLUSTER_PREFIX = MirrorMakerConfig.TARGET_CLUSTER_PREFIX;
-    protected static final String PRODUCER_CLIENT_PREFIX = "producer.";
-    protected static final String CONSUMER_CLIENT_PREFIX = "consumer.";
+    protected static final String PRODUCER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "producer.";
+    protected static final String CONSUMER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "consumer.";

Review comment:
       I have put a new fix that should handle both cases. Details on why I think this is a reasonable fix:
   * There's no change on was is defined [here](https://github.com/apache/kafka/tree/trunk/connect/mirror#producer--consumer--admin-config-used-by-mm2). The properties that starts with `producer.` or `consumer.` are still ignored in mm2 dedicated mode.
   * MM2 needs to differentiate the properties for the source cluster and we can't just remove the `source.cluster` prefix.
   * For running the mirror connectors on a Connect cluster, the only "drawback" is that now you can provide an override with `source.cluster.consumer` or `source.cluster.producer`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on a change in pull request #9313: [mm2] Fix consumer/producer properties override

Posted by GitBox <gi...@apache.org>.
mimaison commented on a change in pull request #9313:
URL: https://github.com/apache/kafka/pull/9313#discussion_r500568357



##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
##########
@@ -199,8 +199,8 @@
 
     protected static final String SOURCE_CLUSTER_PREFIX = MirrorMakerConfig.SOURCE_CLUSTER_PREFIX;
     protected static final String TARGET_CLUSTER_PREFIX = MirrorMakerConfig.TARGET_CLUSTER_PREFIX;
-    protected static final String PRODUCER_CLIENT_PREFIX = "producer.";
-    protected static final String CONSUMER_CLIENT_PREFIX = "consumer.";
+    protected static final String PRODUCER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "producer.";
+    protected static final String CONSUMER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "consumer.";

Review comment:
       I meant add support for that format. We obviously want to keep supporting the existing formats




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #9313: [mm2] Fix consumer/producer properties override

Posted by GitBox <gi...@apache.org>.
mimaison commented on pull request #9313:
URL: https://github.com/apache/kafka/pull/9313#issuecomment-705563795


   @scanterog 
   `testClusterConfigProperties()` seems to be failing now. Can you fix it before I take a look at the PR?
   
   ```
   org.apache.kafka.connect.mirror.MirrorMakerConfigTest > testClusterConfigProperties FAILED
       java.lang.AssertionError: source.cluster.bootstrap.servers is set expected:<servers-one> but was:<null>
           at org.junit.Assert.fail(Assert.java:89)
           at org.junit.Assert.failNotEquals(Assert.java:835)
           at org.junit.Assert.assertEquals(Assert.java:120)
           at org.apache.kafka.connect.mirror.MirrorMakerConfigTest.testClusterConfigProperties(MirrorMakerConfigTest.java:55)
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] scanterog commented on a change in pull request #9313: [mm2] Fix consumer/producer properties override

Posted by GitBox <gi...@apache.org>.
scanterog commented on a change in pull request #9313:
URL: https://github.com/apache/kafka/pull/9313#discussion_r495995224



##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
##########
@@ -199,8 +199,8 @@
 
     protected static final String SOURCE_CLUSTER_PREFIX = MirrorMakerConfig.SOURCE_CLUSTER_PREFIX;
     protected static final String TARGET_CLUSTER_PREFIX = MirrorMakerConfig.TARGET_CLUSTER_PREFIX;
-    protected static final String PRODUCER_CLIENT_PREFIX = "producer.";
-    protected static final String CONSUMER_CLIENT_PREFIX = "consumer.";
+    protected static final String PRODUCER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "producer.";
+    protected static final String CONSUMER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "consumer.";

Review comment:
       Gotcha. I just peek at it again and you're right. It seems the proper fix would be to fix the way mm2 populates the MirrorConnectorConfig to avoid this change. WDYT @ryannedolan ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] scanterog commented on pull request #9313: [mm2] Fix consumer/producer properties override

Posted by GitBox <gi...@apache.org>.
scanterog commented on pull request #9313:
URL: https://github.com/apache/kafka/pull/9313#issuecomment-697968761


   @hachikuji is there any chance you can review this? Thanks!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] scanterog commented on a change in pull request #9313: [mm2] Fix consumer/producer properties override

Posted by GitBox <gi...@apache.org>.
scanterog commented on a change in pull request #9313:
URL: https://github.com/apache/kafka/pull/9313#discussion_r503489280



##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
##########
@@ -52,10 +52,10 @@ public void testClusterConfigProperties() {
             "replication.factor", "4"));
         Map<String, String> connectorProps = mirrorConfig.connectorBaseConfig(new SourceAndTarget("a", "b"),
             MirrorSourceConnector.class);
-        assertEquals("source.cluster.bootstrap.servers is set",
-            "servers-one", connectorProps.get("source.cluster.bootstrap.servers"));
-        assertEquals("target.cluster.bootstrap.servers is set",
-            "servers-two", connectorProps.get("target.cluster.bootstrap.servers"));
+        assertEquals("source.bootstrap.servers is set",

Review comment:
       I think I have achieved what we want. Explicitly setting "source." prefix for props starting with consumer|producer|admin and setting "source.cluster." otherwise. All tests passed. The code runs fine on my infra.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on a change in pull request #9313: [mm2] Fix consumer/producer properties override

Posted by GitBox <gi...@apache.org>.
mimaison commented on a change in pull request #9313:
URL: https://github.com/apache/kafka/pull/9313#discussion_r495975721



##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
##########
@@ -199,8 +199,8 @@
 
     protected static final String SOURCE_CLUSTER_PREFIX = MirrorMakerConfig.SOURCE_CLUSTER_PREFIX;
     protected static final String TARGET_CLUSTER_PREFIX = MirrorMakerConfig.TARGET_CLUSTER_PREFIX;
-    protected static final String PRODUCER_CLIENT_PREFIX = "producer.";
-    protected static final String CONSUMER_CLIENT_PREFIX = "consumer.";
+    protected static final String PRODUCER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "producer.";
+    protected static final String CONSUMER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "consumer.";

Review comment:
       Yes I'm running MM2 in a Connect cluster. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on a change in pull request #9313: [mm2] Fix consumer/producer properties override

Posted by GitBox <gi...@apache.org>.
mimaison commented on a change in pull request #9313:
URL: https://github.com/apache/kafka/pull/9313#discussion_r495817863



##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java
##########
@@ -119,4 +120,41 @@ public void testNonMutationOfConfigDef() {
             connectorConfigDef.names().contains(taskSpecificProperty)
         ));
     }
+
+    @Test
+    public void testSourceConsumerConfig() {
+        Map<String, String> connectorProps = makeProps(
+                MirrorConnectorConfig.CONSUMER_CLIENT_PREFIX + "max.poll.interval.ms", "120000"
+        );
+        MirrorConnectorConfig config = new MirrorConnectorConfig(connectorProps);
+        Map<String, Object> connectorConsumerProps = config.sourceConsumerConfig();
+        Map<String, Object> expectedConsumerProps = new HashMap<>();
+        expectedConsumerProps.put("enable.auto.commit", "false");
+        expectedConsumerProps.put("auto.offset.reset", "earliest");
+        expectedConsumerProps.put("max.poll.interval.ms", "120000");
+        assertEquals(expectedConsumerProps, connectorConsumerProps);
+
+        // checking auto.offset.reset override works
+        connectorProps = makeProps(
+                MirrorConnectorConfig.CONSUMER_CLIENT_PREFIX + "auto.offset.reset", "latest"
+        );
+        config = new MirrorConnectorConfig(connectorProps);
+        connectorConsumerProps = config.sourceConsumerConfig();
+        expectedConsumerProps.put("auto.offset.reset", "latest");
+        expectedConsumerProps.remove("max.poll.interval.ms");
+        assertEquals(expectedConsumerProps, connectorConsumerProps);
+    }
+
+    @Test
+    public void testSourceProducerConfig() {
+        Map<String, String> connectorProps = makeProps(
+                MirrorConnectorConfig.PRODUCER_CLIENT_PREFIX + "acks", "1"
+        );
+        MirrorConnectorConfig config = new MirrorConnectorConfig(connectorProps);
+        Map<String, Object> connectorProducerProps = config.sourceProducerConfig();
+        Map<String, Object> expectedProducerProps = new HashMap<>();
+        expectedProducerProps.put("acks", "1");
+        assertEquals(expectedProducerProps, connectorProducerProps);
+    }
+

Review comment:
       While we're at it, could we also add tests for `targetAdminConfig()` and `sourceAdminConfig()`?

##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
##########
@@ -199,8 +199,8 @@
 
     protected static final String SOURCE_CLUSTER_PREFIX = MirrorMakerConfig.SOURCE_CLUSTER_PREFIX;
     protected static final String TARGET_CLUSTER_PREFIX = MirrorMakerConfig.TARGET_CLUSTER_PREFIX;
-    protected static final String PRODUCER_CLIENT_PREFIX = "producer.";
-    protected static final String CONSUMER_CLIENT_PREFIX = "consumer.";
+    protected static final String PRODUCER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "producer.";
+    protected static final String CONSUMER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "consumer.";

Review comment:
       These changes are going to break existing users. For example, I have connectors with a few settings prefixed with `consumer.`. I wonder if we could keep the old behaviour (even if partially broken) while adding the proper prefixes




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] scanterog commented on pull request #9313: [mm2] Fix consumer/producer/admin properties override

Posted by GitBox <gi...@apache.org>.
scanterog commented on pull request #9313:
URL: https://github.com/apache/kafka/pull/9313#issuecomment-696259251






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on a change in pull request #9313: [mm2] Fix consumer/producer properties override

Posted by GitBox <gi...@apache.org>.
mimaison commented on a change in pull request #9313:
URL: https://github.com/apache/kafka/pull/9313#discussion_r505426022



##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
##########
@@ -199,6 +199,8 @@
 
     protected static final String SOURCE_CLUSTER_PREFIX = MirrorMakerConfig.SOURCE_CLUSTER_PREFIX;
     protected static final String TARGET_CLUSTER_PREFIX = MirrorMakerConfig.TARGET_CLUSTER_PREFIX;
+    protected static final String SOURCE_PREFIX = MirrorMakerConfig.SOURCE_PREFIX;
+    protected static final String TARGET_PREFIX = MirrorMakerConfig.TARGET_PREFIX;
     protected static final String PRODUCER_CLIENT_PREFIX = "producer.";
     protected static final String CONSUMER_CLIENT_PREFIX = "consumer.";
     protected static final String ADMIN_CLIENT_PREFIX = "admin.";

Review comment:
       We could get rid of `SOURCE_ADMIN_CLIENT_PREFIX` and `TARGET_ADMIN_CLIENT_PREFIX` and instead use `SOURCE_PREFIX + ADMIN_CLIENT_PREFIX` or `TARGET_PREFIX + ADMIN_CLIENT_PREFIX` in the method getting Admin Client configs so they are the similar to the Producer and Consumer methods

##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
##########
@@ -245,10 +254,17 @@ public MirrorClientConfig clientConfig(String cluster) {
         Map<String, String> strings = originalsStrings();
         strings.keySet().removeIf(x -> !x.startsWith(prefix));
         return strings;
-    } 
+    }
+
+    static Map<String, String> prefixForGeneralAttrs(String prefix, Map<String, String> props) {

Review comment:
       Can we find a better name for these 2 new methods? Something like `clusterConfigsWithPrefix()` and `clientConfigsWithPrefix()`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on a change in pull request #9313: [mm2] Fix consumer/producer properties override

Posted by GitBox <gi...@apache.org>.
mimaison commented on a change in pull request #9313:
URL: https://github.com/apache/kafka/pull/9313#discussion_r498298987



##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
##########
@@ -199,8 +199,8 @@
 
     protected static final String SOURCE_CLUSTER_PREFIX = MirrorMakerConfig.SOURCE_CLUSTER_PREFIX;
     protected static final String TARGET_CLUSTER_PREFIX = MirrorMakerConfig.TARGET_CLUSTER_PREFIX;
-    protected static final String PRODUCER_CLIENT_PREFIX = "producer.";
-    protected static final String CONSUMER_CLIENT_PREFIX = "consumer.";
+    protected static final String PRODUCER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "producer.";
+    protected static final String CONSUMER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "consumer.";

Review comment:
       Maybe not including `cluster` for all the client specific settings would be slightly better, WDYT?
   ```
   "source.cluster.bootstrap.servers": "localhost:9092",
   "source.cluster.security.protocol": "SASL_SSL",
   "source.producer.some-producer-setting": 123,
   "source.consumer.some-consumer-setting": 123,
   "source.admin.some-admin-setting": 123
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] scanterog commented on pull request #9313: [mm2] Fix consumer/producer properties override

Posted by GitBox <gi...@apache.org>.
scanterog commented on pull request #9313:
URL: https://github.com/apache/kafka/pull/9313#issuecomment-709412233


   > Thanks @scanterog, the changes look good now. I've left a couple of very minor comments that would be nice to address. I'll be happy to merge then
   
   Thanks. There's one test failing for JDK15 apparently. Works locally with JDK11. I wish I could rerun those tests. Many flakeys ones.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] scanterog commented on a change in pull request #9313: [mm2] Fix consumer/producer properties override

Posted by GitBox <gi...@apache.org>.
scanterog commented on a change in pull request #9313:
URL: https://github.com/apache/kafka/pull/9313#discussion_r502351930



##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
##########
@@ -52,10 +52,10 @@ public void testClusterConfigProperties() {
             "replication.factor", "4"));
         Map<String, String> connectorProps = mirrorConfig.connectorBaseConfig(new SourceAndTarget("a", "b"),
             MirrorSourceConnector.class);
-        assertEquals("source.cluster.bootstrap.servers is set",
-            "servers-one", connectorProps.get("source.cluster.bootstrap.servers"));
-        assertEquals("target.cluster.bootstrap.servers is set",
-            "servers-two", connectorProps.get("target.cluster.bootstrap.servers"));
+        assertEquals("source.bootstrap.servers is set",

Review comment:
       That will require more changes. This does not break for the mirrorMaker driver main config. I guess it does for on Connect though 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] scanterog commented on a change in pull request #9313: [mm2] Fix consumer/producer properties override

Posted by GitBox <gi...@apache.org>.
scanterog commented on a change in pull request #9313:
URL: https://github.com/apache/kafka/pull/9313#discussion_r495871017



##########
File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
##########
@@ -199,8 +199,8 @@
 
     protected static final String SOURCE_CLUSTER_PREFIX = MirrorMakerConfig.SOURCE_CLUSTER_PREFIX;
     protected static final String TARGET_CLUSTER_PREFIX = MirrorMakerConfig.TARGET_CLUSTER_PREFIX;
-    protected static final String PRODUCER_CLIENT_PREFIX = "producer.";
-    protected static final String CONSUMER_CLIENT_PREFIX = "consumer.";
+    protected static final String PRODUCER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "producer.";
+    protected static final String CONSUMER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "consumer.";

Review comment:
       I'm not sure if I follow it. This prefix "source_cluster" is not user specified. It is prefixed somewhere else by mm2 (I can get the lines if you want later on). The config specified [here](https://github.com/apache/kafka/tree/trunk/connect/mirror#producer--consumer--admin-config-used-by-mm2) will actually be honored. It does not imply changes on the MirrorMaker config side at least. Are you talking about the case of running MirrorMaker in a connect cluster rather than as a dedicated cluster? I didn't check that one TBH. Maybe @ryannedolan has more insight on this one?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org