You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "divijvaidya (via GitHub)" <gi...@apache.org> on 2023/02/21 19:01:37 UTC

[GitHub] [kafka] divijvaidya opened a new pull request, #13284: KAFKA-14718: Make MirrorMaker startup synchronously depend on connector start

divijvaidya opened a new pull request, #13284:
URL: https://github.com/apache/kafka/pull/13284

   # Motivation
   The test suite fails with the following error:
   ```
   org.opentest4j.AssertionFailedError: Condition not met within timeout 30000. topic A.test-topic-0 was not created on cluster B- ._~:/?#[]@!$&'()*+;="<>%{}|\^`618 in time ==> expected: <true> but was: <false>
   ```
   Root cause of the failure is available in stdout:
   ```
   [2023-02-17 18:56:32,308] INFO This node is a follower for A->B- ._~:/?#[]@!$&'()*+;="<>%{}|\^`618. Using existing connector configuration. (org.apache.kafka.connect.mirror.MirrorMaker:234)
   [2023-02-17 18:56:42,567] INFO Kafka MirrorMaker stopping (org.apache.kafka.connect.mirror.MirrorMaker:202)
   [2023-02-17 18:56:43,371] INFO Kafka MirrorMaker stopped. (org.apache.kafka.connect.mirror.MirrorMaker:213)
   [2023-02-17 18:56:43,371] INFO Kafka MirrorMaker stopping (org.apache.kafka.connect.mirror.MirrorMaker:202)
   [2023-02-17 18:56:44,376] INFO Kafka MirrorMaker stopped. (org.apache.kafka.connect.mirror.MirrorMaker:213)
   [2023-02-17 18:56:44,377] INFO Kafka MirrorMaker stopping (org.apache.kafka.connect.mirror.MirrorMaker:202)
   [2023-02-17 18:56:45,782] ERROR Failed to configure MirrorSourceConnector connector for A->B- ._~:/?#[]@!$&'()*+;="<>%{}|\^`618 (org.apache.kafka.connect.mirror.MirrorMaker:236)
   org.apache.kafka.connect.errors.ConnectException: Worker is shutting down
   	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.halt(DistributedHerder.java:766)
   	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:361)
   	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
   	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
   	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
   	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
   ```
   
   The test tries to assert topic replication on the second cluster but the mirror maker nodes themselves are unavailable. They are unavailable because the connectors fail to configure. Connectors fail to configure because the worker is already shutting down by the time they are ready to be configured.
   
   # Change
   
   This highlights a gap in MirrorMaker startup sequence where it does not wait for connectors to be configured by the herders before declaring itself as "started". This leads to a situation where MirrorMaker claims it has started but the herders are still in the process of configuring the connectors. During this time, mirror maker will not perform any replication.
   
   With this change, we add a dependency on connector configuration for the MirrorMaker to declare itself as started.
   
   # Rejected alternative
   An alternative approach is to modify the test to wait for connectors to get configured on all 3 MM nodes before starting the actual test i.e. before making a call to create topic. But this alternative is artificially hiding the problems which users of MirrorMaker may also face.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on pull request #13284: KAFKA-14718: Make MirrorMaker startup synchronously depend on connector start

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on PR #13284:
URL: https://github.com/apache/kafka/pull/13284#issuecomment-1450105039

   > This is less concerning because the MirrorMaker class isn't part of public API;
   
   It's not technically part of public API but it has a `main()` method which is executed by the customers. As you pointed out, today, they have to rely on logs/metrics to figure out whether MM started correctly or not. 
   
   I agree to the creation of `connectorStatus()` and usage in tests as you suggested, but additionally we should block the completion of `main()` until all connectors are online. The part about blocking could be taken up in a different PR and we can discuss the merits/demerits of doing that over there.
   
   Meanwhile, I will add `connectorStatus()` and relevant changes to this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13284: KAFKA-14718: Fix flaky DedicatedMirrorIntegrationTest

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #13284:
URL: https://github.com/apache/kafka/pull/13284#discussion_r1248164637


##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java:
##########
@@ -273,6 +288,30 @@ private void writeToTopic(EmbeddedKafkaCluster cluster, String topic, int numMes
         }
     }
 
+    private void awaitMirrorMakerStart(final SourceAndTarget sourceAndTarget) throws InterruptedException {
+        waitForCondition(() -> {
+            try {
+                return mirrorMakers.values().stream().allMatch(
+                    mm -> CONNECTOR_CLASSES.stream().allMatch(
+                        connectorClazz -> isConnectorRunningForMirrorMaker(connectorClazz, mm, sourceAndTarget)));
+            } catch (Exception ex) {
+                log.error("Something unexpected occurred. Unable to check for startup status for mirror maker for {}", sourceAndTarget, ex);
+                throw ex;
+            }
+        }, MM_START_UP_TIMEOUT_MS, "MirrorMaker instances did not transition to running in time");
+    }
+
+    private <T extends SourceConnector> void awaitConnectorTasksStart(final Class<T> clazz, final String source, String target) throws InterruptedException {
+        waitForCondition(() -> {
+            try {
+                return mirrorMakers.values().stream().allMatch(mm -> isTaskRunningForMirrorMakerConnector(clazz, mm, source, target));

Review Comment:
   Done. Only checking for one node now.



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java:
##########
@@ -273,6 +288,30 @@ private void writeToTopic(EmbeddedKafkaCluster cluster, String topic, int numMes
         }
     }
 
+    private void awaitMirrorMakerStart(final SourceAndTarget sourceAndTarget) throws InterruptedException {
+        waitForCondition(() -> {
+            try {
+                return mirrorMakers.values().stream().allMatch(

Review Comment:
   Done in latest revision.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on pull request #13284: KAFKA-14718: Fix flaky DedicatedMirrorIntegrationTest

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on PR #13284:
URL: https://github.com/apache/kafka/pull/13284#issuecomment-1580790338

   @C0urante thank you for the suggestions. The new code should be close to what you suggested. The test passes locally for me now. Please feel free to review when you get a chance.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante merged pull request #13284: KAFKA-14718: Fix flaky DedicatedMirrorIntegrationTest

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante merged PR #13284:
URL: https://github.com/apache/kafka/pull/13284


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on pull request #13284: KAFKA-14718: Make MirrorMaker startup synchronously depend on connector start

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on PR #13284:
URL: https://github.com/apache/kafka/pull/13284#issuecomment-1448267997

   Thank you for the detailed explanation @C0urante!
   
   > so this doesn't really buy us much right now.
   
   Oops, my bad! I did not notice that.
   
   > this only guarantees that connector configs have been written to the config topic. 
   
   That is true. We need a more comprehensive check. I was basing this change on the fact that we use similar verification in other tests (see `waitUntilMirrorMakerIsRunning()` in `MirrorConnectorsIntegrationBaseTest`). It's more of a "better than nothing" approach here.
   
   > And even if we did tweak this PR to do that kind of blocking, we'd still have to choose a reasonable amount of time to wait for all of these operations to complete.
   
   I think that fixing the test by extending the timeout might hide the problems that users may be facing in production as well. Ideally the `MirrorMaker#start()` should be returning a `Future` which could be tracked to check the completion of start. In absence of such a mechanism, today, a user might call `MirrorMaker#start()` and assume that it has started correctly while in the background, connector start up might have failed (just as it did in this test scenario). Hence, I believe, we need a better mechanism to check for successful startup of MirrorMaker. Thoughts?
   
   > Do you think it might be worth it to just bump the timeouts of these tests?
   
   We could do that AND as a partial safety check block topic creation in this test until connectors have at least been configured (see rejected alternative). Let's reach to a conclusion on the above point first and then we can proceed with this option if that's what we agree to.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on pull request #13284: KAFKA-14718: Fix flaky DedicatedMirrorIntegrationTest

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on PR #13284:
URL: https://github.com/apache/kafka/pull/13284#issuecomment-1615032737

   I had to rebase with trunk to resolve merge conflicts. Changes in latest revision.
   
   1\ Moved  `awaitMirrorMakerStart` into the test cases.
   2\ Only check for one mirror maker node to start instead of all of them.
   3\ Only check for connector tasks in one MM node instead of all of the nodes.
   4\ Use NoRetryException to fail fast in case of a task/connector failure. (Thank you for introducing me to this nice test utility!)
   5\ Task/Connector failure is already logged at ERROR level.
   
   This test passes locally. @C0urante ready for review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on pull request #13284: KAFKA-14718: Fix flaky DedicatedMirrorIntegrationTest

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on PR #13284:
URL: https://github.com/apache/kafka/pull/13284#issuecomment-1451769479

   The test failures seems unrelated (and `./gradlew unitTest` pass for me locally), hence, rebasing against trunk with a hope that the gradle issues go away.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on pull request #13284: KAFKA-14718: Make MirrorMaker startup synchronously depend on connector start

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on PR #13284:
URL: https://github.com/apache/kafka/pull/13284#issuecomment-1448579807

   > I think that fixing the test by extending the timeout might hide the problems that users may be facing in production as well.
   
   I don't think that bumping the timeouts in these tests is likely to hide issues that may surface in production; rather, I'm only worried that they'll make local testing and development more cumbersome.
   
   > Hence, I believe, we need a better mechanism to check for successful startup of MirrorMaker. Thoughts?
   
   Agreed 👍 IMO MM2 observability in general can be improved. Right now you're basically limited to JMX, other custom metrics reporters, and logs. There's no public API for viewing cluster-wide health info (the closest thing is reading directly from the status topic, but that's not public API and we shouldn't count on users doing that or recommend it).
   
   If there's an issue starting one of the connectors, instead of failing startup of the MM2 node, we [log a message](https://github.com/apache/kafka/blob/f586fa59d3f938e04bda4e8143ddb1c4310eaf78/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java#L232-L237) and move on. I can't imagine this is very easy for users to work with, though maybe I'm underestimating the utility of JMX and logs here.
   
   > In absence of such a mechanism, today, a user might call MirrorMaker#start() and assume that it has started correctly while in the background, connector start up might have failed (just as it did in this test scenario).
   
   This is less concerning because the `MirrorMaker` class isn't part of public API; users are not expected to directly interact with any instances of this class and we do not support any use cases like that right now.
   
   
   I'm wondering if, instead of a `Future`-based approach, we might add an internal API to the `MirrorMaker` class to expose the status of the connectors running on it?
   
   A brief sketch:
   
   ```java
   public class MirrorMaker {
       public ConnectorStateInfo connectorStatus(String source, String target, String connector) {
           SourceAndTarget sourceAndTarget = new SourceAndTarget(source, target);
           checkHerder(sourceAndTarget);
           return herders.get(new SourceAndTarget(source, target)).connectorStatus(connector);
       }
   }
   ```
   
   We could then leverage this API in the `DedicatedMirrorIntegrationTest` suite to implement an equivalent of the `waitUntilMirrorMakerIsRunning` method from the `MirrorConnectorsIntegrationBaseTest` suite.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on pull request #13284: KAFKA-14718: Fix flaky DedicatedMirrorIntegrationTest

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on PR #13284:
URL: https://github.com/apache/kafka/pull/13284#issuecomment-1592807196

   ```
   [Build / JDK 17 and Scala 2.13 / org.apache.kafka.common.security.oauthbearer.internals.secured.RefreshingHttpsJwksTest.testSecondaryRefreshAfterElapsedDelay()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13284/9/testReport/junit/org.apache.kafka.common.security.oauthbearer.internals.secured/RefreshingHttpsJwksTest/Build___JDK_17_and_Scala_2_13___testSecondaryRefreshAfterElapsedDelay__/)
   [Build / JDK 17 and Scala 2.13 / kafka.zk.ZkMigrationIntegrationTest.[1] Type=ZK, Name=testNewAndChangedTopicsInDualWrite, MetadataVersion=3.4-IV0, Security=PLAINTEXT](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13284/9/testReport/junit/kafka.zk/ZkMigrationIntegrationTest/Build___JDK_17_and_Scala_2_13____1__Type_ZK__Name_testNewAndChangedTopicsInDualWrite__MetadataVersion_3_4_IV0__Security_PLAINTEXT/)
   [Build / JDK 17 and Scala 2.13 / org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13284/9/testReport/junit/org.apache.kafka.trogdor.coordinator/CoordinatorTest/Build___JDK_17_and_Scala_2_13___testTaskRequestWithOldStartMsGetsUpdated__/)
   [Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testSyncTopicConfigs()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13284/9/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsIntegrationSSLTest/Build___JDK_11_and_Scala_2_13___testSyncTopicConfigs__/)
   [Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testSyncTopicConfigs()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13284/9/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsIntegrationBaseTest/Build___JDK_8_and_Scala_2_12___testSyncTopicConfigs__/)
   [Build / JDK 17 and Scala 2.13 / org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13284/9/testReport/junit/org.apache.kafka.controller/QuorumControllerTest/Build___JDK_17_and_Scala_2_13___testBalancePartitionLeaders__/)
   [Build / JDK 11 and Scala 2.13 / org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13284/9/testReport/junit/org.apache.kafka.controller/QuorumControllerTest/Build___JDK_11_and_Scala_2_13___testBalancePartitionLeaders__/)
   [Build / JDK 8 and Scala 2.12 / org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13284/9/testReport/junit/org.apache.kafka.controller/QuorumControllerTest/Build___JDK_8_and_Scala_2_12___testBalancePartitionLeaders___2/)
   ```
   
   Test failures are unrelated. @C0urante pinging for review when you get a chance.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #13284: KAFKA-14718: Fix flaky DedicatedMirrorIntegrationTest

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #13284:
URL: https://github.com/apache/kafka/pull/13284#discussion_r1123546376


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java:
##########
@@ -322,6 +324,29 @@ public void run() {
         }
     }
 
+    /**
+     * Validates that the underlying connectors and tasks are running.
+     * Visible for testing purpose.
+     */
+    boolean isConfiguredAndRunning() {
+        return CONNECTOR_CLASSES.stream().allMatch(connectorClazz -> {
+            final String connName = connectorClazz.getSimpleName();
+            return herderPairs.stream().allMatch(sourceAndTarget -> {
+                final ConnectorStateInfo connectorStatus = this.connectorStatus(sourceAndTarget, connName);
+                return connectorStatus != null
+                    // verify that connector state is set to running
+                    && connectorStatus.connector().state().equals(AbstractStatus.State.RUNNING.toString())
+                    // verify that all tasks are set to running
+                    && connectorStatus.tasks().stream().allMatch(s -> s.state().equals(AbstractStatus.State.RUNNING.toString()));

Review Comment:
   This doesn't actually verify that tasks are running (i.e., there could be an empty set of tasks for a connector and this condition would still be true).
   
   We probably want to either 1) require that at least one task exists or 2) require the user to specify the expected number of tasks for each connector, in addition to this check (which ensures that every task that does exist is in the `RUNNING` state).



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java:
##########
@@ -322,6 +324,29 @@ public void run() {
         }
     }
 
+    /**
+     * Validates that the underlying connectors and tasks are running.
+     * Visible for testing purpose.
+     */
+    boolean isConfiguredAndRunning() {
+        return CONNECTOR_CLASSES.stream().allMatch(connectorClazz -> {
+            final String connName = connectorClazz.getSimpleName();
+            return herderPairs.stream().allMatch(sourceAndTarget -> {
+                final ConnectorStateInfo connectorStatus = this.connectorStatus(sourceAndTarget, connName);
+                return connectorStatus != null
+                    // verify that connector state is set to running
+                    && connectorStatus.connector().state().equals(AbstractStatus.State.RUNNING.toString())
+                    // verify that all tasks are set to running
+                    && connectorStatus.tasks().stream().allMatch(s -> s.state().equals(AbstractStatus.State.RUNNING.toString()));
+            });
+        });
+    }

Review Comment:
   We should expose the smallest testing-only APIs necessary in non-testing code, and implement other logic in testing-only places.
   
   So in this case, the API we'd expose here would be the `connectorStatus` method, and the logic in this method would go in, e.g., the `DedicatedMirrorIntegrationTest` suite.



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/DedicatedMirrorIntegrationTest.java:
##########
@@ -62,8 +60,10 @@ public void setup() {
     @AfterEach
     public void teardown() throws Throwable {
         AtomicReference<Throwable> shutdownFailure = new AtomicReference<>();
-        mirrorMakers.forEach((name, mirrorMaker) ->
-            Utils.closeQuietly(mirrorMaker::stop, "MirrorMaker worker '" + name + "'", shutdownFailure)
+        mirrorMakers.forEach((name, mirrorMaker) -> {
+                Utils.closeQuietly(mirrorMaker::stop, "MirrorMaker worker '" + name + "'", shutdownFailure);
+                mirrorMaker.awaitStop();
+            }

Review Comment:
   We can optimize a little here, which will hopefully reduce the impact this change has on test time when running multiple MM2 nodes:
   ```suggestion
           mirrorMakers.forEach((name, mirrorMaker) -> {
                   Utils.closeQuietly(mirrorMaker::stop, "MirrorMaker worker '" + name + "'", shutdownFailure);
               }
           );
           mirrorMakers.forEach((name, mirrorMaker) -> {
                       mirrorMaker.awaitStop();
               }
   ```



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java:
##########
@@ -322,6 +324,29 @@ public void run() {
         }
     }
 
+    /**
+     * Validates that the underlying connectors and tasks are running.
+     * Visible for testing purpose.
+     */
+    boolean isConfiguredAndRunning() {
+        return CONNECTOR_CLASSES.stream().allMatch(connectorClazz -> {
+            final String connName = connectorClazz.getSimpleName();
+            return herderPairs.stream().allMatch(sourceAndTarget -> {
+                final ConnectorStateInfo connectorStatus = this.connectorStatus(sourceAndTarget, connName);
+                return connectorStatus != null
+                    // verify that connector state is set to running
+                    && connectorStatus.connector().state().equals(AbstractStatus.State.RUNNING.toString())
+                    // verify that all tasks are set to running
+                    && connectorStatus.tasks().stream().allMatch(s -> s.state().equals(AbstractStatus.State.RUNNING.toString()));
+            });
+        });
+    }
+
+    private ConnectorStateInfo connectorStatus(SourceAndTarget sourceAndTarget, String connector) {
+        checkHerder(sourceAndTarget);
+        return herders.get(sourceAndTarget).connectorStatus(connector);
+    }

Review Comment:
   This can be made `public` and we can move all other assertion logic around the status of MM2 into testing code.



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/DedicatedMirrorIntegrationTest.java:
##########
@@ -14,14 +14,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.connect.mirror.integration;

Review Comment:
   This move isn't necessary; we can and should keep this test suite in the `integration` package.
   
   It's fine to add `public` methods to the `MirrorMaker` class since that class is not part of the public API.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13284: KAFKA-14718: Fix flaky DedicatedMirrorIntegrationTest

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #13284:
URL: https://github.com/apache/kafka/pull/13284#discussion_r1121820892


##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/DedicatedMirrorIntegrationTest.java:
##########
@@ -14,14 +14,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.connect.mirror.integration;

Review Comment:
   Note for reviewers: this was required so that we can access isConfiguredAndRunning from the test without having to make the method public.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #13284: KAFKA-14718: Fix flaky DedicatedMirrorIntegrationTest

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #13284:
URL: https://github.com/apache/kafka/pull/13284#discussion_r1237643809


##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java:
##########
@@ -273,6 +288,30 @@ private void writeToTopic(EmbeddedKafkaCluster cluster, String topic, int numMes
         }
     }
 
+    private void awaitMirrorMakerStart(final SourceAndTarget sourceAndTarget) throws InterruptedException {
+        waitForCondition(() -> {
+            try {
+                return mirrorMakers.values().stream().allMatch(
+                    mm -> CONNECTOR_CLASSES.stream().allMatch(
+                        connectorClazz -> isConnectorRunningForMirrorMaker(connectorClazz, mm, sourceAndTarget)));
+            } catch (Exception ex) {
+                log.error("Something unexpected occurred. Unable to check for startup status for mirror maker for {}", sourceAndTarget, ex);
+                throw ex;
+            }
+        }, MM_START_UP_TIMEOUT_MS, "MirrorMaker instances did not transition to running in time");
+    }
+
+    private <T extends SourceConnector> void awaitConnectorTasksStart(final Class<T> clazz, final String source, String target) throws InterruptedException {
+        waitForCondition(() -> {
+            try {
+                return mirrorMakers.values().stream().allMatch(mm -> isTaskRunningForMirrorMakerConnector(clazz, mm, source, target));

Review Comment:
   Same thought RE only performing assertions with a single `MirrorMaker` instance instead of all of them.



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java:
##########
@@ -288,4 +327,28 @@ private void awaitTopicContent(EmbeddedKafkaCluster cluster, String clusterName,
         }
     }
 
+    /**
+     * Validates that the underlying connector are running for the given MirrorMaker.
+     */
+    private boolean isConnectorRunningForMirrorMaker(final Class<?> connectorClazz, final MirrorMaker mm, final SourceAndTarget sourceAndTarget) {
+        final String connName = connectorClazz.getSimpleName();
+        final ConnectorStateInfo connectorStatus = mm.connectorStatus(sourceAndTarget, connName);
+        return connectorStatus != null
+            // verify that connector state is set to running
+            && connectorStatus.connector().state().equals(AbstractStatus.State.RUNNING.toString());
+    }
+
+    /**
+     * Validates that the tasks are associated with the connector and they are running for the given MirrorMaker.
+     */
+    private <T extends SourceConnector> boolean isTaskRunningForMirrorMakerConnector(final Class<T> connectorClazz, final MirrorMaker mm, final String source, final String target) {
+        final SourceAndTarget sourceAndTarget = new SourceAndTarget(source, target);
+        final String connName = connectorClazz.getSimpleName();
+        final ConnectorStateInfo connectorStatus = mm.connectorStatus(sourceAndTarget, connName);
+        return isConnectorRunningForMirrorMaker(connectorClazz, mm, sourceAndTarget)
+            // verify that at least one task exists
+            && !connectorStatus.tasks().isEmpty()
+            // verify that tasks are set to running
+            && connectorStatus.tasks().stream().allMatch(s -> s.state().equals(AbstractStatus.State.RUNNING.toString()));

Review Comment:
   Same thought RE throwing a `NoRetryException` if a connector (or, here, task) is `FAILED`.



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java:
##########
@@ -94,6 +100,11 @@ private MirrorMaker startMirrorMaker(String name, Map<String, String> mmProps) {
 
         result.start();
 
+        // wait for connectors to start
+        String[] clusterNames = mmProps.get("clusters").split(",");
+        SourceAndTarget sourceAndTarget = new SourceAndTarget(clusterNames[0].trim(), clusterNames[1].trim());
+        awaitMirrorMakerStart(sourceAndTarget);

Review Comment:
   This assumes we define a one-way flow with two clusters, from the first cluster to the second.
   
   This assumption may not hold in future updates and people may think we did this intentionally (instead of out of convenience).
   
   I think it'd be better to move the calls to `awaitMirrorMakerStart` into the test cases themselves, since that's where the clusters for each run are currently defined.



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java:
##########
@@ -288,4 +327,28 @@ private void awaitTopicContent(EmbeddedKafkaCluster cluster, String clusterName,
         }
     }
 
+    /**
+     * Validates that the underlying connector are running for the given MirrorMaker.
+     */
+    private boolean isConnectorRunningForMirrorMaker(final Class<?> connectorClazz, final MirrorMaker mm, final SourceAndTarget sourceAndTarget) {
+        final String connName = connectorClazz.getSimpleName();
+        final ConnectorStateInfo connectorStatus = mm.connectorStatus(sourceAndTarget, connName);
+        return connectorStatus != null
+            // verify that connector state is set to running
+            && connectorStatus.connector().state().equals(AbstractStatus.State.RUNNING.toString());

Review Comment:
   Can we throw a [NoRetryException](https://github.com/apache/kafka/blob/3c059133d3008d87f018f2efa4af27027fd5d18e/clients/src/test/java/org/apache/kafka/test/NoRetryException.java) if we see that the connector's status is `FAILED`? That way, developers won't have to wait for two minutes to learn that startup has failed. We can also include (or at least log) the stack trace for the failed connector.



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java:
##########
@@ -273,6 +288,30 @@ private void writeToTopic(EmbeddedKafkaCluster cluster, String topic, int numMes
         }
     }
 
+    private void awaitMirrorMakerStart(final SourceAndTarget sourceAndTarget) throws InterruptedException {
+        waitForCondition(() -> {
+            try {
+                return mirrorMakers.values().stream().allMatch(

Review Comment:
   We don't have to perform this assertion for every MM2 node; it's enough to verify that it succeeds with at least one.
   
   I think we may want to refactor this method to accept a `MirrorMaker` instance and require the caller to provide a single instance that can be used to query the status of connectors/tasks. This would work well if we also move the call sites for this method into individual test cases, so that we only have to call it once per replication flow instead of once per MM2 node.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13284: KAFKA-14718: Fix flaky DedicatedMirrorIntegrationTest

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #13284:
URL: https://github.com/apache/kafka/pull/13284#discussion_r1248164007


##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java:
##########
@@ -288,4 +327,28 @@ private void awaitTopicContent(EmbeddedKafkaCluster cluster, String clusterName,
         }
     }
 
+    /**
+     * Validates that the underlying connector are running for the given MirrorMaker.
+     */
+    private boolean isConnectorRunningForMirrorMaker(final Class<?> connectorClazz, final MirrorMaker mm, final SourceAndTarget sourceAndTarget) {
+        final String connName = connectorClazz.getSimpleName();
+        final ConnectorStateInfo connectorStatus = mm.connectorStatus(sourceAndTarget, connName);
+        return connectorStatus != null
+            // verify that connector state is set to running
+            && connectorStatus.connector().state().equals(AbstractStatus.State.RUNNING.toString());

Review Comment:
   Done in latest revision. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13284: KAFKA-14718: Fix flaky DedicatedMirrorIntegrationTest

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #13284:
URL: https://github.com/apache/kafka/pull/13284#discussion_r1135316350


##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/DedicatedMirrorIntegrationTest.java:
##########
@@ -62,8 +60,10 @@ public void setup() {
     @AfterEach
     public void teardown() throws Throwable {
         AtomicReference<Throwable> shutdownFailure = new AtomicReference<>();
-        mirrorMakers.forEach((name, mirrorMaker) ->
-            Utils.closeQuietly(mirrorMaker::stop, "MirrorMaker worker '" + name + "'", shutdownFailure)
+        mirrorMakers.forEach((name, mirrorMaker) -> {
+                Utils.closeQuietly(mirrorMaker::stop, "MirrorMaker worker '" + name + "'", shutdownFailure);
+                mirrorMaker.awaitStop();
+            }

Review Comment:
   done in latest commit.



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/DedicatedMirrorIntegrationTest.java:
##########
@@ -62,8 +60,10 @@ public void setup() {
     @AfterEach
     public void teardown() throws Throwable {
         AtomicReference<Throwable> shutdownFailure = new AtomicReference<>();
-        mirrorMakers.forEach((name, mirrorMaker) ->
-            Utils.closeQuietly(mirrorMaker::stop, "MirrorMaker worker '" + name + "'", shutdownFailure)
+        mirrorMakers.forEach((name, mirrorMaker) -> {
+                Utils.closeQuietly(mirrorMaker::stop, "MirrorMaker worker '" + name + "'", shutdownFailure);
+                mirrorMaker.awaitStop();
+            }

Review Comment:
   good idea. done in latest commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on pull request #13284: KAFKA-14718: Fix flaky DedicatedMirrorIntegrationTest

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on PR #13284:
URL: https://github.com/apache/kafka/pull/13284#issuecomment-1522220743

   Hey @C0urante 
   I haven't forgotten about this PR :) I am stuck right now and would solicit your help here.
   I have made the changes that you suggested but the below condition is never satisfied.
   ```
   // verify that at least one task exists
   && !connectorStatus.tasks().isEmpty()
   ```
   I am not very much familiar with MM code and connect framework. If you have a suggestion to fix it, I would be happy to do so, otherwise, I will probably need some time to dig into this further.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on pull request #13284: KAFKA-14718: Fix flaky DedicatedMirrorIntegrationTest

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on PR #13284:
URL: https://github.com/apache/kafka/pull/13284#issuecomment-1574216434

   @divijvaidya No worries about the delay--looks like we're both guilty on that front 😅
   
   I took at look at this and the issue is that the `MirrorCheckpointConnector` and `MirrorSourceConnector` instances are generating empty lists of task configs, which makes since since the former never generates tasks during these tests and the latter only generates tasks once a to-be-mirrored topic on the upstream cluster is created, which currently takes place only after our startup check has finished.
   
   Correct me if I'm wrong: the idea here is to add granularity to test failure messages so that we can differentiate between MM2 startup issues (caused by lagginess or connector/task failures) and issues with the actual replication logic performed by MM2. If that's the case, do you think it might make sense to do the following?
   
   - Alter the"await startup" logic to fail immediately if it detects any failed connectors or tasks
   - Alter the "await startup" logic to operate on a per-connector-type basis, and selectively await the startup of different connector types at different points in the test (e.g., we could await the startup of the heartbeat connector in any place where we currently call `waitForMirrorMakersToStart`, but only await the startup of the source connector and its tasks once we've created a to-be-replicated topic on the upstream cluster)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on pull request #13284: KAFKA-14718: Fix flaky DedicatedMirrorIntegrationTest

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on PR #13284:
URL: https://github.com/apache/kafka/pull/13284#issuecomment-1629620630

   Thank you for your patience on this one @C0urante. Appreciate it!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on pull request #13284: KAFKA-14718: Make MirrorMaker startup synchronously depend on connector start

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on PR #13284:
URL: https://github.com/apache/kafka/pull/13284#issuecomment-1438985658

   @C0urante please take a look when you get a chance. `unitTest` and `integrationTest` succeed for me locally.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on pull request #13284: KAFKA-14718: Fix flaky DedicatedMirrorIntegrationTest

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on PR #13284:
URL: https://github.com/apache/kafka/pull/13284#issuecomment-1450247692

   @C0urante ready for your review. I have increased the timeout to 2 min. and also added the wait for mirror maker to get ready. In a separate JIRA we should talk about how we can make the startup synchronous. I have created one such JIRA at https://issues.apache.org/jira/browse/KAFKA-14773 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13284: KAFKA-14718: Fix flaky DedicatedMirrorIntegrationTest

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #13284:
URL: https://github.com/apache/kafka/pull/13284#discussion_r1248163268


##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java:
##########
@@ -288,4 +327,28 @@ private void awaitTopicContent(EmbeddedKafkaCluster cluster, String clusterName,
         }
     }
 
+    /**
+     * Validates that the underlying connector are running for the given MirrorMaker.
+     */
+    private boolean isConnectorRunningForMirrorMaker(final Class<?> connectorClazz, final MirrorMaker mm, final SourceAndTarget sourceAndTarget) {
+        final String connName = connectorClazz.getSimpleName();
+        final ConnectorStateInfo connectorStatus = mm.connectorStatus(sourceAndTarget, connName);
+        return connectorStatus != null
+            // verify that connector state is set to running
+            && connectorStatus.connector().state().equals(AbstractStatus.State.RUNNING.toString());
+    }
+
+    /**
+     * Validates that the tasks are associated with the connector and they are running for the given MirrorMaker.
+     */
+    private <T extends SourceConnector> boolean isTaskRunningForMirrorMakerConnector(final Class<T> connectorClazz, final MirrorMaker mm, final String source, final String target) {
+        final SourceAndTarget sourceAndTarget = new SourceAndTarget(source, target);
+        final String connName = connectorClazz.getSimpleName();
+        final ConnectorStateInfo connectorStatus = mm.connectorStatus(sourceAndTarget, connName);
+        return isConnectorRunningForMirrorMaker(connectorClazz, mm, sourceAndTarget)
+            // verify that at least one task exists
+            && !connectorStatus.tasks().isEmpty()
+            // verify that tasks are set to running
+            && connectorStatus.tasks().stream().allMatch(s -> s.state().equals(AbstractStatus.State.RUNNING.toString()));

Review Comment:
   Done in latest revision.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13284: KAFKA-14718: Fix flaky DedicatedMirrorIntegrationTest

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #13284:
URL: https://github.com/apache/kafka/pull/13284#discussion_r1248166033


##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java:
##########
@@ -40,16 +47,16 @@
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
+import static org.apache.kafka.connect.mirror.MirrorMaker.CONNECTOR_CLASSES;
 import static org.apache.kafka.test.TestUtils.waitForCondition;
 
 @Tag("integration")
 public class DedicatedMirrorIntegrationTest {
 
     private static final Logger log = LoggerFactory.getLogger(DedicatedMirrorIntegrationTest.class);
-
-    private static final int TOPIC_CREATION_TIMEOUT_MS = 120_000;

Review Comment:
   Reverted back to original since we have a better logic to ensure correct sequence of startup now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org