You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/02/15 10:39:07 UTC
[camel-kafka-connector] 06/10: Cleanup the check state logic on the
KafkaConnectEmbeddedService
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 09df84fe919a3e87ffad97b2be55fcbda5875150
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Fri Feb 12 16:05:44 2021 +0100
Cleanup the check state logic on the KafkaConnectEmbeddedService
---
.../common/services/kafkaconnect/KafkaConnectEmbedded.java | 11 ++++++++---
1 file changed, 8 insertions(+), 3 deletions(-)
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectEmbedded.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectEmbedded.java
index bc6b868..52af0a5 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectEmbedded.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectEmbedded.java
@@ -64,6 +64,12 @@ public class KafkaConnectEmbedded implements KafkaConnectService {
LOG.trace("Added the new connector");
}
+ private boolean doCheckState(ConnectorStateInfo connectorStateInfo, Integer expectedTaskNumber) {
+ return connectorStateInfo.tasks().size() >= expectedTaskNumber
+ && connectorStateInfo.connector().state().equals(AbstractStatus.State.RUNNING.toString())
+ && connectorStateInfo.tasks().stream().allMatch(s -> s.state().equals(AbstractStatus.State.RUNNING.toString()));
+ }
+
@Override
public void initializeConnectorBlocking(ConnectorPropertyFactory propertyFactory, Integer expectedTaskNumber) throws InterruptedException {
initializeConnector(propertyFactory);
@@ -73,9 +79,8 @@ public class KafkaConnectEmbedded implements KafkaConnectService {
connectorStateInfo = cluster.connectorStatus(connectorName);
Thread.sleep(20L);
} while (connectorStateInfo == null);
- return connectorStateInfo.tasks().size() >= expectedTaskNumber
- && connectorStateInfo.connector().state().equals(AbstractStatus.State.RUNNING.toString())
- && connectorStateInfo.tasks().stream().allMatch(s -> s.state().equals(AbstractStatus.State.RUNNING.toString()));
+
+ return doCheckState(connectorStateInfo, expectedTaskNumber);
}, 30000L, "The connector " + connectorName + " did not start within a reasonable time");
}