You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by na...@apache.org on 2021/01/29 21:39:33 UTC
[ignite-extensions] branch master updated: IGNITE-14010 Fixes flaky
KafkaIgniteStreamerSelfTest. (#39)
This is an automated email from the ASF dual-hosted git repository.
namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git
The following commit(s) were added to refs/heads/master by this push:
new 1774aaa IGNITE-14010 Fixes flaky KafkaIgniteStreamerSelfTest. (#39)
1774aaa is described below
commit 1774aaa62eafd5f8820bf8a39d063f0bcd2a2150
Author: Mikhail Petrov <32...@users.noreply.github.com>
AuthorDate: Sat Jan 30 00:39:28 2021 +0300
IGNITE-14010 Fixes flaky KafkaIgniteStreamerSelfTest. (#39)
---
.../ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java | 12 ++++++------
1 file changed, 6 insertions(+), 6 deletions(-)
diff --git a/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java
index 3f20d87..484608d 100644
--- a/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java
+++ b/modules/kafka-ext/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java
@@ -26,7 +26,6 @@ import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
@@ -46,6 +45,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.Test;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
/**
@@ -208,9 +208,6 @@ public class KafkaIgniteStreamerSelfTest extends GridCommonAbstractTest {
return entries;
});
- // Start kafka streamer.
- kafkaStmr.start();
-
final CountDownLatch latch = new CountDownLatch(CNT);
IgniteBiPredicate<UUID, CacheEvent> locLsnr = new IgniteBiPredicate<UUID, CacheEvent>() {
@@ -238,9 +235,12 @@ public class KafkaIgniteStreamerSelfTest extends GridCommonAbstractTest {
ignite.events(ignite.cluster().forCacheNodes(DEFAULT_CACHE_NAME)).remoteListen(locLsnr, null, EVT_CACHE_OBJECT_PUT);
- // Checks all events successfully processed in 10 seconds.
+ // Start kafka streamer.
+ kafkaStmr.start();
+
+ // Checks all events successfully processed in test timeout.
assertTrue("Failed to wait latch completion, still wait " + latch.getCount() + " events",
- latch.await(10, TimeUnit.SECONDS));
+ latch.await(getTestTimeout(), MILLISECONDS));
for (Map.Entry<String, String> entry : keyValMap.entrySet())
assertEquals(entry.getValue(), cache.get(entry.getKey()));