You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/02/15 10:39:07 UTC

[camel-kafka-connector] 06/10: Cleanup the check state logic on the KafkaConnectEmbeddedService

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 09df84fe919a3e87ffad97b2be55fcbda5875150
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Fri Feb 12 16:05:44 2021 +0100

    Cleanup the check state logic on the KafkaConnectEmbeddedService
---
 .../common/services/kafkaconnect/KafkaConnectEmbedded.java    | 11 ++++++++---
 1 file changed, 8 insertions(+), 3 deletions(-)

diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectEmbedded.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectEmbedded.java
index bc6b868..52af0a5 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectEmbedded.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectEmbedded.java
@@ -64,6 +64,12 @@ public class KafkaConnectEmbedded implements KafkaConnectService {
         LOG.trace("Added the new connector");
     }
 
+    private boolean doCheckState(ConnectorStateInfo connectorStateInfo, Integer expectedTaskNumber) {
+        return connectorStateInfo.tasks().size() >= expectedTaskNumber
+                && connectorStateInfo.connector().state().equals(AbstractStatus.State.RUNNING.toString())
+                && connectorStateInfo.tasks().stream().allMatch(s -> s.state().equals(AbstractStatus.State.RUNNING.toString()));
+    }
+
     @Override
     public void initializeConnectorBlocking(ConnectorPropertyFactory propertyFactory, Integer expectedTaskNumber) throws InterruptedException {
         initializeConnector(propertyFactory);
@@ -73,9 +79,8 @@ public class KafkaConnectEmbedded implements KafkaConnectService {
                 connectorStateInfo = cluster.connectorStatus(connectorName);
                 Thread.sleep(20L);
             } while (connectorStateInfo == null);
-            return  connectorStateInfo.tasks().size() >= expectedTaskNumber
-                    && connectorStateInfo.connector().state().equals(AbstractStatus.State.RUNNING.toString())
-                    && connectorStateInfo.tasks().stream().allMatch(s -> s.state().equals(AbstractStatus.State.RUNNING.toString()));
+
+            return doCheckState(connectorStateInfo, expectedTaskNumber);
         }, 30000L, "The connector " + connectorName + " did not start within a reasonable time");
     }