You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "C0urante (via GitHub)" <gi...@apache.org> on 2023/01/20 18:46:40 UTC

[GitHub] [kafka] C0urante opened a new pull request, #13137: KAFKA-15086: Intra-cluster communication for MirrorMaker 2

C0urante opened a new pull request, #13137:
URL: https://github.com/apache/kafka/pull/13137

   [Jira](https://issues.apache.org/jira/browse/KAFKA-10586)
   
   Implements the internal REST API changes described in [KIP-710](https://cwiki.apache.org/confluence/display/KAFKA/KIP-710%3A+Full+support+for+distributed+mode+in+dedicated+MirrorMaker+2.0+clusters).
   
   A lot of the diff here comes from shuffling existing parts of the code base into new parts, without significantly altering them. This is necessary in order to make the current REST API logic for Kafka Connect more extensible and reusable, which in turn allows us to add logic for the dedicated Mirror Maker 2 REST API with less effort.
   
   Shuffled but not extensively rewritten portions include:
   - `WorkerConfig` properties being extracted into the new `RestServerConfig` class
   - Pulling out some REST forwarding logic previously internal to the `ConnectorsResource` class into the new `HerderRequestHandler` class
   - Pulling out all REST logic for internal endpoints from the `ConnectorsResource` class into the new `InternalClusterResource` class
   
   These changes should not affect the docs generated by, e.g., the `DistributedConfig` class, but should allow other `AbstractConfig` classes to easily add properties related to the (public- or internal-facing) Connect REST API.
   
   An integration test is also added that runs a multi-node, dedicated Mirror Maker 2 cluster with exactly-once support enabled in order to test out both code paths related to intra-cluster communication.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #13137: KAFKA-15086: Intra-cluster communication for Mirror Maker 2

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #13137:
URL: https://github.com/apache/kafka/pull/13137#discussion_r1100587361


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/resources/InternalMirrorResource.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror.rest.resources;
+
+import org.apache.kafka.connect.mirror.SourceAndTarget;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.rest.RestClient;
+import org.apache.kafka.connect.runtime.rest.resources.InternalClusterResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.NotFoundException;
+import javax.ws.rs.Path;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.UriInfo;
+import java.util.List;
+import java.util.Map;
+
+@Path("/{source}/{target}/connectors")
+public class InternalMirrorResource extends InternalClusterResource {
+
+    @Context
+    private UriInfo uriInfo;
+
+    private static final Logger log = LoggerFactory.getLogger(InternalMirrorResource.class);
+
+    private final Map<SourceAndTarget, Herder> herders;
+
+    public InternalMirrorResource(Map<SourceAndTarget, Herder> herders, RestClient restClient) {
+        super(restClient);
+        this.herders = herders;
+    }
+
+    @Override
+    protected Herder herderForRequest() {
+        String source = pathParam("source");
+        String target = pathParam("target");
+        Herder result = herders.get(new SourceAndTarget(source, target));
+        if (result == null) {
+            log.debug("Failed to find herder for source '{}' and target '{}'", source, target);

Review Comment:
   I was under the impression that throwing this would only cause the exception message to be present in the REST response (and presumably the logs for the worker that issued the request), but after revisiting the `ConnectExceptionMapper` class, it appears that we'll also log the exception at debug level on this worker. So yep, we can remove this 👍



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #13137: KAFKA-15086: Intra-cluster communication for Mirror Maker 2

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #13137:
URL: https://github.com/apache/kafka/pull/13137#discussion_r1100585212


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/resources/InternalMirrorResource.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror.rest.resources;
+
+import org.apache.kafka.connect.mirror.SourceAndTarget;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.rest.RestClient;
+import org.apache.kafka.connect.runtime.rest.resources.InternalClusterResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.NotFoundException;
+import javax.ws.rs.Path;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.UriInfo;
+import java.util.List;
+import java.util.Map;
+
+@Path("/{source}/{target}/connectors")
+public class InternalMirrorResource extends InternalClusterResource {
+
+    @Context
+    private UriInfo uriInfo;
+
+    private static final Logger log = LoggerFactory.getLogger(InternalMirrorResource.class);
+
+    private final Map<SourceAndTarget, Herder> herders;
+
+    public InternalMirrorResource(Map<SourceAndTarget, Herder> herders, RestClient restClient) {
+        super(restClient);
+        this.herders = herders;
+    }
+
+    @Override
+    protected Herder herderForRequest() {
+        String source = pathParam("source");
+        String target = pathParam("target");
+        Herder result = herders.get(new SourceAndTarget(source, target));
+        if (result == null) {
+            log.debug("Failed to find herder for source '{}' and target '{}'", source, target);
+            throw new NotFoundException("No replication flow found for source '" + source + "' and target '" + target + "'");
+        }
+        return result;
+    }
+
+    private String pathParam(String name) {
+        List<String> result = uriInfo.getPathParameters().get(name);
+        if (result == null || result.isEmpty())
+            throw new NotFoundException();

Review Comment:
   Yeah, couldn't hurt. This is almost guaranteed to end up in a log file on another worker so in the unlikely event that this code path is somehow reached, more detail would be preferable to less.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #13137: KAFKA-15086: Intra-cluster communication for Mirror Maker 2

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #13137:
URL: https://github.com/apache/kafka/pull/13137#discussion_r1090935837


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java:
##########
@@ -255,13 +287,26 @@ private void addHerder(SourceAndTarget sourceAndTarget) {
         // Pass the shared admin to the distributed herder as an additional AutoCloseable object that should be closed when the
         // herder is stopped. MirrorMaker has multiple herders, and having the herder own the close responsibility is much easier than
         // tracking the various shared admin objects in this class.
-        // Do not provide a restClient to the DistributedHerder to indicate that request forwarding is disabled
         Herder herder = new DistributedHerder(distributedConfig, time, worker,
                 kafkaClusterId, statusBackingStore, configBackingStore,
-                advertisedUrl, null, CLIENT_CONFIG_OVERRIDE_POLICY, sharedAdmin);
+                advertisedUrl, restClient, CLIENT_CONFIG_OVERRIDE_POLICY,
+                restNamespace, sharedAdmin);
         herders.put(sourceAndTarget, herder);
     }
 
+    private static String encodePath(String rawPath) throws UnsupportedEncodingException {
+        return URLEncoder.encode(rawPath, StandardCharsets.UTF_8.name())
+                // Java's out-of-the-box URL encoder encodes spaces (' ') as pluses ('+'),
+                // and pluses as '%2B'
+                // But Jetty doesn't decode pluses at all and leaves them as-are in decoded
+                // URLs
+                // So to get around that, we replace pluses in the encoded URL here with '%20',
+                // which is the encoding that Jetty expects for spaces
+                // Jetty will reverse this transformation when evaluating the path parameters
+                // and will return decoded strings with all special characters as they were.

Review Comment:
   It's the result of fitting a square peg (Java's `URLEncoder` class, which, despite the name, is designed for HTML form encoding instead of URL path encoding) into a round hole (URL path encoding). I couldn't find a better alternative than this, and considering the fairly low risk and integration test coverage, figured it'd be good enough for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] viktorsomogyi commented on a diff in pull request #13137: KAFKA-15086: Intra-cluster communication for Mirror Maker 2

Posted by "viktorsomogyi (via GitHub)" <gi...@apache.org>.
viktorsomogyi commented on code in PR #13137:
URL: https://github.com/apache/kafka/pull/13137#discussion_r1087986328


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java:
##########
@@ -119,7 +126,16 @@ public class MirrorMaker {
     public MirrorMaker(MirrorMakerConfig config, List<String> clusters, Time time) {
         log.debug("Kafka MirrorMaker instance created");
         this.time = time;
-        this.advertisedBaseUrl = "NOTUSED";
+        if (config.enableInternalRest()) {
+            this.restClient = new RestClient(config);
+            internalServer = new MirrorRestServer(config.originals(), restClient);
+            internalServer.initializeServer();
+            this.advertisedUrl = internalServer.advertisedUrl().toString();
+        } else {
+            internalServer = null;
+            restClient = null;
+            this.advertisedUrl = "NOTUSED";

Review Comment:
   What prevents this from being null or Optional<String>?



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java:
##########
@@ -255,13 +287,26 @@ private void addHerder(SourceAndTarget sourceAndTarget) {
         // Pass the shared admin to the distributed herder as an additional AutoCloseable object that should be closed when the
         // herder is stopped. MirrorMaker has multiple herders, and having the herder own the close responsibility is much easier than
         // tracking the various shared admin objects in this class.
-        // Do not provide a restClient to the DistributedHerder to indicate that request forwarding is disabled
         Herder herder = new DistributedHerder(distributedConfig, time, worker,
                 kafkaClusterId, statusBackingStore, configBackingStore,
-                advertisedUrl, null, CLIENT_CONFIG_OVERRIDE_POLICY, sharedAdmin);
+                advertisedUrl, restClient, CLIENT_CONFIG_OVERRIDE_POLICY,
+                restNamespace, sharedAdmin);
         herders.put(sourceAndTarget, herder);
     }
 
+    private static String encodePath(String rawPath) throws UnsupportedEncodingException {
+        return URLEncoder.encode(rawPath, StandardCharsets.UTF_8.name())
+                // Java's out-of-the-box URL encoder encodes spaces (' ') as pluses ('+'),
+                // and pluses as '%2B'
+                // But Jetty doesn't decode pluses at all and leaves them as-are in decoded
+                // URLs
+                // So to get around that, we replace pluses in the encoded URL here with '%20',
+                // which is the encoding that Jetty expects for spaces
+                // Jetty will reverse this transformation when evaluating the path parameters
+                // and will return decoded strings with all special characters as they were.

Review Comment:
   Is this a Jetty bug or just some loophole in URL encoding in general?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #13137: KAFKA-15086: Intra-cluster communication for Mirror Maker 2

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #13137:
URL: https://github.com/apache/kafka/pull/13137#discussion_r1090935837


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java:
##########
@@ -255,13 +287,26 @@ private void addHerder(SourceAndTarget sourceAndTarget) {
         // Pass the shared admin to the distributed herder as an additional AutoCloseable object that should be closed when the
         // herder is stopped. MirrorMaker has multiple herders, and having the herder own the close responsibility is much easier than
         // tracking the various shared admin objects in this class.
-        // Do not provide a restClient to the DistributedHerder to indicate that request forwarding is disabled
         Herder herder = new DistributedHerder(distributedConfig, time, worker,
                 kafkaClusterId, statusBackingStore, configBackingStore,
-                advertisedUrl, null, CLIENT_CONFIG_OVERRIDE_POLICY, sharedAdmin);
+                advertisedUrl, restClient, CLIENT_CONFIG_OVERRIDE_POLICY,
+                restNamespace, sharedAdmin);
         herders.put(sourceAndTarget, herder);
     }
 
+    private static String encodePath(String rawPath) throws UnsupportedEncodingException {
+        return URLEncoder.encode(rawPath, StandardCharsets.UTF_8.name())
+                // Java's out-of-the-box URL encoder encodes spaces (' ') as pluses ('+'),
+                // and pluses as '%2B'
+                // But Jetty doesn't decode pluses at all and leaves them as-are in decoded
+                // URLs
+                // So to get around that, we replace pluses in the encoded URL here with '%20',
+                // which is the encoding that Jetty expects for spaces
+                // Jetty will reverse this transformation when evaluating the path parameters
+                // and will return decoded strings with all special characters as they were.

Review Comment:
   It's the result of fitting a square peg (Java's `URLEncoder` class, which, despite the name, is designed for HTML form encoding instead of URL path encoding) into a round hole (URL path encoding). I couldn't find a better alternative than this, and considering the fairly low risk (this is all intended to cover a pretty niche edge case) and integration test coverage, figured it'd be good enough for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mimaison commented on a diff in pull request #13137: KAFKA-15086: Intra-cluster communication for Mirror Maker 2

Posted by "mimaison (via GitHub)" <gi...@apache.org>.
mimaison commented on code in PR #13137:
URL: https://github.com/apache/kafka/pull/13137#discussion_r1100386611


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java:
##########
@@ -80,18 +81,25 @@ public class MirrorMakerConfig extends AbstractConfig {
     static final String TARGET_CLUSTER_PREFIX = "target.cluster.";
     static final String SOURCE_PREFIX = "source.";
     static final String TARGET_PREFIX = "target.";
+    static final String ENABLE_INTERNAL_REST_CONFIG = "dedicated.mode.enable.internal.rest";
+    private static final String ENABLE_INTERNAL_REST_DOC =
+            "Whether to bring up an internal-only REST server that allows multi-node clusters to operate correctly";

Review Comment:
   Nit: We usually have a period at the end of descriptions.



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/resources/InternalMirrorResource.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror.rest.resources;
+
+import org.apache.kafka.connect.mirror.SourceAndTarget;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.rest.RestClient;
+import org.apache.kafka.connect.runtime.rest.resources.InternalClusterResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.NotFoundException;
+import javax.ws.rs.Path;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.UriInfo;
+import java.util.List;
+import java.util.Map;
+
+@Path("/{source}/{target}/connectors")
+public class InternalMirrorResource extends InternalClusterResource {
+
+    @Context
+    private UriInfo uriInfo;
+
+    private static final Logger log = LoggerFactory.getLogger(InternalMirrorResource.class);
+
+    private final Map<SourceAndTarget, Herder> herders;
+
+    public InternalMirrorResource(Map<SourceAndTarget, Herder> herders, RestClient restClient) {
+        super(restClient);
+        this.herders = herders;
+    }
+
+    @Override
+    protected Herder herderForRequest() {
+        String source = pathParam("source");
+        String target = pathParam("target");
+        Herder result = herders.get(new SourceAndTarget(source, target));
+        if (result == null) {
+            log.debug("Failed to find herder for source '{}' and target '{}'", source, target);
+            throw new NotFoundException("No replication flow found for source '" + source + "' and target '" + target + "'");
+        }
+        return result;
+    }
+
+    private String pathParam(String name) {
+        List<String> result = uriInfo.getPathParameters().get(name);
+        if (result == null || result.isEmpty())
+            throw new NotFoundException();

Review Comment:
   Should we add a message here?



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/resources/InternalMirrorResource.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror.rest.resources;
+
+import org.apache.kafka.connect.mirror.SourceAndTarget;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.rest.RestClient;
+import org.apache.kafka.connect.runtime.rest.resources.InternalClusterResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.NotFoundException;
+import javax.ws.rs.Path;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.UriInfo;
+import java.util.List;
+import java.util.Map;
+
+@Path("/{source}/{target}/connectors")
+public class InternalMirrorResource extends InternalClusterResource {
+
+    @Context
+    private UriInfo uriInfo;
+
+    private static final Logger log = LoggerFactory.getLogger(InternalMirrorResource.class);
+
+    private final Map<SourceAndTarget, Herder> herders;
+
+    public InternalMirrorResource(Map<SourceAndTarget, Herder> herders, RestClient restClient) {
+        super(restClient);
+        this.herders = herders;
+    }
+
+    @Override
+    protected Herder herderForRequest() {
+        String source = pathParam("source");
+        String target = pathParam("target");
+        Herder result = herders.get(new SourceAndTarget(source, target));
+        if (result == null) {
+            log.debug("Failed to find herder for source '{}' and target '{}'", source, target);
+            throw new NotFoundException("No replication flow found for source '" + source + "' and target '" + target + "'");
+        }
+        return result;
+    }
+
+    private String pathParam(String name) {
+        List<String> result = uriInfo.getPathParameters().get(name);

Review Comment:
   Could we use `getFirst()` and not have to check whether result is empty or do `get(0)`.



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/resources/InternalMirrorResource.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror.rest.resources;
+
+import org.apache.kafka.connect.mirror.SourceAndTarget;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.rest.RestClient;
+import org.apache.kafka.connect.runtime.rest.resources.InternalClusterResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.NotFoundException;
+import javax.ws.rs.Path;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.UriInfo;
+import java.util.List;
+import java.util.Map;
+
+@Path("/{source}/{target}/connectors")
+public class InternalMirrorResource extends InternalClusterResource {
+
+    @Context
+    private UriInfo uriInfo;
+
+    private static final Logger log = LoggerFactory.getLogger(InternalMirrorResource.class);
+
+    private final Map<SourceAndTarget, Herder> herders;
+
+    public InternalMirrorResource(Map<SourceAndTarget, Herder> herders, RestClient restClient) {
+        super(restClient);
+        this.herders = herders;
+    }
+
+    @Override
+    protected Herder herderForRequest() {
+        String source = pathParam("source");
+        String target = pathParam("target");
+        Herder result = herders.get(new SourceAndTarget(source, target));
+        if (result == null) {
+            log.debug("Failed to find herder for source '{}' and target '{}'", source, target);

Review Comment:
   Since we throw below, do we need this debug line?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java:
##########
@@ -210,44 +200,47 @@ public void initializeServer() {
         }
 
         log.info("REST server listening at " + jettyServer.getURI() + ", advertising URL " + advertisedUrl());
-        log.info("REST admin endpoints at " + adminUrl());
+        URI adminUrl = adminUrl();
+        if (adminUrl != null)
+            log.info("REST admin endpoints at " + adminUrl);
     }
 
-    public void initializeResources(Herder herder) {
+    protected final void initializeResources() {
         log.info("Initializing REST resources");
+        this.resources = new ArrayList<>();

Review Comment:
   nit: slightly more readable without `this.` here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #13137: KAFKA-15086: Intra-cluster communication for Mirror Maker 2

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #13137:
URL: https://github.com/apache/kafka/pull/13137#discussion_r1090943031


##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror.integration;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.mirror.MirrorMaker;
+import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+
+@Tag("integration")
+public class DedicatedMirrorIntegrationTest {
+
+    private static final Logger log = LoggerFactory.getLogger(DedicatedMirrorIntegrationTest.class);
+
+    private static final int TOPIC_CREATION_TIMEOUT_MS = 30_000;
+    private static final int TOPIC_REPLICATION_TIMEOUT_MS = 30_000;
+
+    private Map<String, EmbeddedKafkaCluster> kafkaClusters;
+    private Map<String, MirrorMaker> mirrorMakers;
+
+    @BeforeEach
+    public void setup() {
+        kafkaClusters = new HashMap<>();
+        mirrorMakers = new HashMap<>();
+    }
+
+    @AfterEach
+    public void teardown() throws Throwable {
+        AtomicReference<Throwable> shutdownFailure = new AtomicReference<>();
+        mirrorMakers.forEach((name, mirrorMaker) ->
+            Utils.closeQuietly(mirrorMaker::stop, "MirrorMaker worker '" + name + "'", shutdownFailure)
+        );
+        kafkaClusters.forEach((name, kafkaCluster) ->
+            Utils.closeQuietly(kafkaCluster::stop, "Embedded Kafka cluster '" + name + "'", shutdownFailure)
+        );
+        if (shutdownFailure.get() != null) {
+            throw shutdownFailure.get();
+        }
+    }
+
+    private EmbeddedKafkaCluster startKafkaCluster(String name, int numBrokers, Properties brokerProperties) {
+        if (kafkaClusters.containsKey(name))
+            throw new IllegalStateException("Cannot register multiple Kafka clusters with the same name");
+
+        EmbeddedKafkaCluster result = new EmbeddedKafkaCluster(numBrokers, brokerProperties);
+        kafkaClusters.put(name, result);
+
+        result.start();
+
+        return result;
+    }
+
+    private MirrorMaker startMirrorMaker(String name, Map<String, String> mmProps) {
+        if (mirrorMakers.containsKey(name))
+            throw new IllegalStateException("Cannot register multiple MirrorMaker nodes with the same name");
+
+        MirrorMaker result = new MirrorMaker(mmProps);
+        mirrorMakers.put(name, result);
+
+        result.start();
+
+        return result;
+    }
+
+    /**
+     * Test that a multi-node dedicated cluster is able to dynamically detect new topics at runtime

Review Comment:
   We don't have a guarantee that task configs are relayed from one worker to another, that's true. However, because we enable exactly-once support, we do get a guarantee that intra-cluster communication takes place, since no source tasks can start on a follower node without first issuing a REST request to the cluster's leader.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on pull request #13137: KAFKA-15086: Intra-cluster communication for Mirror Maker 2

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on PR #13137:
URL: https://github.com/apache/kafka/pull/13137#issuecomment-1398797302

   @gharris1727 @viktorsomogyi @mimaison would you mind taking a look at this when you have a moment?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] urbandan commented on a diff in pull request #13137: KAFKA-15086: Intra-cluster communication for Mirror Maker 2

Posted by "urbandan (via GitHub)" <gi...@apache.org>.
urbandan commented on code in PR #13137:
URL: https://github.com/apache/kafka/pull/13137#discussion_r1091577828


##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror.integration;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.mirror.MirrorMaker;
+import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+
+@Tag("integration")
+public class DedicatedMirrorIntegrationTest {
+
+    private static final Logger log = LoggerFactory.getLogger(DedicatedMirrorIntegrationTest.class);
+
+    private static final int TOPIC_CREATION_TIMEOUT_MS = 30_000;
+    private static final int TOPIC_REPLICATION_TIMEOUT_MS = 30_000;
+
+    private Map<String, EmbeddedKafkaCluster> kafkaClusters;
+    private Map<String, MirrorMaker> mirrorMakers;
+
+    @BeforeEach
+    public void setup() {
+        kafkaClusters = new HashMap<>();
+        mirrorMakers = new HashMap<>();
+    }
+
+    @AfterEach
+    public void teardown() throws Throwable {
+        AtomicReference<Throwable> shutdownFailure = new AtomicReference<>();
+        mirrorMakers.forEach((name, mirrorMaker) ->
+            Utils.closeQuietly(mirrorMaker::stop, "MirrorMaker worker '" + name + "'", shutdownFailure)
+        );
+        kafkaClusters.forEach((name, kafkaCluster) ->
+            Utils.closeQuietly(kafkaCluster::stop, "Embedded Kafka cluster '" + name + "'", shutdownFailure)
+        );
+        if (shutdownFailure.get() != null) {
+            throw shutdownFailure.get();
+        }
+    }
+
+    private EmbeddedKafkaCluster startKafkaCluster(String name, int numBrokers, Properties brokerProperties) {
+        if (kafkaClusters.containsKey(name))
+            throw new IllegalStateException("Cannot register multiple Kafka clusters with the same name");
+
+        EmbeddedKafkaCluster result = new EmbeddedKafkaCluster(numBrokers, brokerProperties);
+        kafkaClusters.put(name, result);
+
+        result.start();
+
+        return result;
+    }
+
+    private MirrorMaker startMirrorMaker(String name, Map<String, String> mmProps) {
+        if (mirrorMakers.containsKey(name))
+            throw new IllegalStateException("Cannot register multiple MirrorMaker nodes with the same name");
+
+        MirrorMaker result = new MirrorMaker(mmProps);
+        mirrorMakers.put(name, result);
+
+        result.start();
+
+        return result;
+    }
+
+    /**
+     * Test that a multi-node dedicated cluster is able to dynamically detect new topics at runtime

Review Comment:
   Thanks for the clarification. I think using the EOS endpoints should be enough, as I don't really have any ideas on how to make sure that the task config update is triggered.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante merged pull request #13137: KAFKA-15086, KAFKA-9981: Intra-cluster communication for Mirror Maker 2

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante merged PR #13137:
URL: https://github.com/apache/kafka/pull/13137


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on pull request #13137: KAFKA-15086, KAFKA-9981: Intra-cluster communication for Mirror Maker 2

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on PR #13137:
URL: https://github.com/apache/kafka/pull/13137#issuecomment-1424428493

   Whoops--there was a typo in the title at the time of merge. It should refer to [KAFKA-10586](https://issues.apache.org/jira/browse/KAFKA-10586), not KAFKA-15086.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on pull request #13137: KAFKA-15086: Intra-cluster communication for Mirror Maker 2

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on PR #13137:
URL: https://github.com/apache/kafka/pull/13137#issuecomment-1423159660

   Thanks Mickael, I've addressed your suggestions and rebased.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #13137: KAFKA-15086: Intra-cluster communication for Mirror Maker 2

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #13137:
URL: https://github.com/apache/kafka/pull/13137#discussion_r1090936585


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java:
##########
@@ -119,7 +126,16 @@ public class MirrorMaker {
     public MirrorMaker(MirrorMakerConfig config, List<String> clusters, Time time) {
         log.debug("Kafka MirrorMaker instance created");
         this.time = time;
-        this.advertisedBaseUrl = "NOTUSED";
+        if (config.enableInternalRest()) {
+            this.restClient = new RestClient(config);
+            internalServer = new MirrorRestServer(config.originals(), restClient);
+            internalServer.initializeServer();
+            this.advertisedUrl = internalServer.advertisedUrl().toString();
+        } else {
+            internalServer = null;
+            restClient = null;
+            this.advertisedUrl = "NOTUSED";

Review Comment:
   It ends up being used in the metadata workers send to the group coordinator during rebalances, and the schema we use to (de)serialize that data requires a non-null string for the worker URL. I stuck with the `"NOTUSED"` placeholder instead of an empty string since that's what we're using currently, and it doesn't seem worth the risk to try to change it now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] viktorsomogyi commented on a diff in pull request #13137: KAFKA-15086: Intra-cluster communication for Mirror Maker 2

Posted by "viktorsomogyi (via GitHub)" <gi...@apache.org>.
viktorsomogyi commented on code in PR #13137:
URL: https://github.com/apache/kafka/pull/13137#discussion_r1100211704


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java:
##########
@@ -255,13 +287,26 @@ private void addHerder(SourceAndTarget sourceAndTarget) {
         // Pass the shared admin to the distributed herder as an additional AutoCloseable object that should be closed when the
         // herder is stopped. MirrorMaker has multiple herders, and having the herder own the close responsibility is much easier than
         // tracking the various shared admin objects in this class.
-        // Do not provide a restClient to the DistributedHerder to indicate that request forwarding is disabled
         Herder herder = new DistributedHerder(distributedConfig, time, worker,
                 kafkaClusterId, statusBackingStore, configBackingStore,
-                advertisedUrl, null, CLIENT_CONFIG_OVERRIDE_POLICY, sharedAdmin);
+                advertisedUrl, restClient, CLIENT_CONFIG_OVERRIDE_POLICY,
+                restNamespace, sharedAdmin);
         herders.put(sourceAndTarget, herder);
     }
 
+    private static String encodePath(String rawPath) throws UnsupportedEncodingException {
+        return URLEncoder.encode(rawPath, StandardCharsets.UTF_8.name())
+                // Java's out-of-the-box URL encoder encodes spaces (' ') as pluses ('+'),
+                // and pluses as '%2B'
+                // But Jetty doesn't decode pluses at all and leaves them as-are in decoded
+                // URLs
+                // So to get around that, we replace pluses in the encoded URL here with '%20',
+                // which is the encoding that Jetty expects for spaces
+                // Jetty will reverse this transformation when evaluating the path parameters
+                // and will return decoded strings with all special characters as they were.

Review Comment:
   Ok, I'm fine with it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] gharris1727 commented on a diff in pull request #13137: KAFKA-15086: Intra-cluster communication for Mirror Maker 2

Posted by "gharris1727 (via GitHub)" <gi...@apache.org>.
gharris1727 commented on code in PR #13137:
URL: https://github.com/apache/kafka/pull/13137#discussion_r1084435808


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -228,12 +229,17 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
      * @param kafkaClusterId     the identifier of the Kafka cluster to use for internal topics; may not be null
      * @param statusBackingStore the backing store for statuses; may not be null
      * @param configBackingStore the backing store for connector configurations; may not be null
-     * @param restUrl            the URL of this herder's REST API; may not be null
+     * @param restUrl            the URL of this herder's REST API; may not be null, but may be an arbitrary placeholder
+     *                           value if this worker does not expose a REST API
+     * @param restClient         a REST client that can be used to issue requests to other workers in the cluster; may
+     *                           be null if inter-worker communication is not enabled
      * @param connectorClientConfigOverridePolicy the policy specifying the client configuration properties that may be overridden
      *                                            in connector configurations; may not be null
+     * @param restNamespace      zero or more path elements to prepend to the paths of forwarded REST requests; may be empty, but not null
      * @param uponShutdown       any {@link AutoCloseable} objects that should be closed when this herder is {@link #stop() stopped},
      *                           after all services and resources owned by this herder are stopped
      */
+    // TODO: Do we really need two separate public constructors?

Review Comment:
   interesting, the for-testing constructor appears unused, and has been for as long as it's existed (ever since #321).
   We can push this refactor out to a different PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #13137: KAFKA-15086: Intra-cluster communication for Mirror Maker 2

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #13137:
URL: https://github.com/apache/kafka/pull/13137#discussion_r1084460160


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -228,12 +229,17 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
      * @param kafkaClusterId     the identifier of the Kafka cluster to use for internal topics; may not be null
      * @param statusBackingStore the backing store for statuses; may not be null
      * @param configBackingStore the backing store for connector configurations; may not be null
-     * @param restUrl            the URL of this herder's REST API; may not be null
+     * @param restUrl            the URL of this herder's REST API; may not be null, but may be an arbitrary placeholder
+     *                           value if this worker does not expose a REST API
+     * @param restClient         a REST client that can be used to issue requests to other workers in the cluster; may
+     *                           be null if inter-worker communication is not enabled
      * @param connectorClientConfigOverridePolicy the policy specifying the client configuration properties that may be overridden
      *                                            in connector configurations; may not be null
+     * @param restNamespace      zero or more path elements to prepend to the paths of forwarded REST requests; may be empty, but not null
      * @param uponShutdown       any {@link AutoCloseable} objects that should be closed when this herder is {@link #stop() stopped},
      *                           after all services and resources owned by this herder are stopped
      */
+    // TODO: Do we really need two separate public constructors?

Review Comment:
   Blegh, this comment was left in from an earlier draft where there was an additional constructor. I refactored it out before opening the PR, but apparently forgot to remove the comment. Took it out now, thanks for catching!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] urbandan commented on a diff in pull request #13137: KAFKA-15086: Intra-cluster communication for Mirror Maker 2

Posted by "urbandan (via GitHub)" <gi...@apache.org>.
urbandan commented on code in PR #13137:
URL: https://github.com/apache/kafka/pull/13137#discussion_r1086404484


##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror.integration;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.mirror.MirrorMaker;
+import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+
+@Tag("integration")
+public class DedicatedMirrorIntegrationTest {
+
+    private static final Logger log = LoggerFactory.getLogger(DedicatedMirrorIntegrationTest.class);
+
+    private static final int TOPIC_CREATION_TIMEOUT_MS = 30_000;
+    private static final int TOPIC_REPLICATION_TIMEOUT_MS = 30_000;
+
+    private Map<String, EmbeddedKafkaCluster> kafkaClusters;
+    private Map<String, MirrorMaker> mirrorMakers;
+
+    @BeforeEach
+    public void setup() {
+        kafkaClusters = new HashMap<>();
+        mirrorMakers = new HashMap<>();
+    }
+
+    @AfterEach
+    public void teardown() throws Throwable {
+        AtomicReference<Throwable> shutdownFailure = new AtomicReference<>();
+        mirrorMakers.forEach((name, mirrorMaker) ->
+            Utils.closeQuietly(mirrorMaker::stop, "MirrorMaker worker '" + name + "'", shutdownFailure)
+        );
+        kafkaClusters.forEach((name, kafkaCluster) ->
+            Utils.closeQuietly(kafkaCluster::stop, "Embedded Kafka cluster '" + name + "'", shutdownFailure)
+        );
+        if (shutdownFailure.get() != null) {
+            throw shutdownFailure.get();
+        }
+    }
+
+    private EmbeddedKafkaCluster startKafkaCluster(String name, int numBrokers, Properties brokerProperties) {
+        if (kafkaClusters.containsKey(name))
+            throw new IllegalStateException("Cannot register multiple Kafka clusters with the same name");
+
+        EmbeddedKafkaCluster result = new EmbeddedKafkaCluster(numBrokers, brokerProperties);
+        kafkaClusters.put(name, result);
+
+        result.start();
+
+        return result;
+    }
+
+    private MirrorMaker startMirrorMaker(String name, Map<String, String> mmProps) {
+        if (mirrorMakers.containsKey(name))
+            throw new IllegalStateException("Cannot register multiple MirrorMaker nodes with the same name");
+
+        MirrorMaker result = new MirrorMaker(mmProps);
+        mirrorMakers.put(name, result);
+
+        result.start();
+
+        return result;
+    }
+
+    /**
+     * Test that a multi-node dedicated cluster is able to dynamically detect new topics at runtime

Review Comment:
   Can you elaborate on how this will ensure that the task config update works with the inter-node comm?
   Is there a way to ensure that the MirrorSourceConnector will be assigned to a follower?
   Just wondering if this test can pass most of the time, and will flakily fail 1 out of X if there is an issue with the task config update.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on pull request #13137: KAFKA-15086: Intra-cluster communication for Mirror Maker 2

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on PR #13137:
URL: https://github.com/apache/kafka/pull/13137#issuecomment-1424408278

   Thanks for the reviews, everyone!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org