You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by mm...@apache.org on 2019/09/04 17:47:45 UTC
[metron] branch master updated: METRON-2227 Increase Kafka test
harness timeout (tigerquoll via mmiklavc) closes apache/metron#1493
This is an automated email from the ASF dual-hosted git repository.
mmiklavcic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/metron.git
The following commit(s) were added to refs/heads/master by this push:
new 67fa5a4 METRON-2227 Increase Kafka test harness timeout (tigerquoll via mmiklavc) closes apache/metron#1493
67fa5a4 is described below
commit 67fa5a403b01d0f7c8607c06e63f9d06f8b8cbc1
Author: tigerquoll <ti...@outlook.com>
AuthorDate: Wed Sep 4 11:47:04 2019 -0600
METRON-2227 Increase Kafka test harness timeout (tigerquoll via mmiklavc) closes apache/metron#1493
---
.../integration/components/KafkaComponent.java | 22 +++++++++++++---------
1 file changed, 13 insertions(+), 9 deletions(-)
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaComponent.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaComponent.java
index 08910be..0fa414b 100644
--- a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaComponent.java
+++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaComponent.java
@@ -65,6 +65,10 @@ import org.slf4j.LoggerFactory;
public class KafkaComponent implements InMemoryComponent {
protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ public static final long KAFKA_PROPAGATE_TIMEOUT_MS = 10000l;
+ public static final int ZK_SESSION_TIMEOUT_MS = 30000;
+ public static final int ZK_CONNECTION_TIMEOUT_MS = 30000;
+ public static final int KAFKA_ZOOKEEPER_TIMEOUT_MS = 1000000;
public static class Topic {
public int numPartitions;
@@ -159,11 +163,11 @@ public class KafkaComponent implements InMemoryComponent {
// setup Zookeeper
zookeeperConnectString = topologyProperties.getProperty(ZKServerComponent.ZOOKEEPER_PROPERTY);
- zkClient = new ZkClient(zookeeperConnectString, 30000, 30000, ZKStringSerializer$.MODULE$);
+ zkClient = new ZkClient(zookeeperConnectString, ZK_SESSION_TIMEOUT_MS, ZK_CONNECTION_TIMEOUT_MS, ZKStringSerializer$.MODULE$);
// setup Broker
Properties props = TestUtilsWrapper.createBrokerConfig(0, zookeeperConnectString, brokerPort);
- props.setProperty("zookeeper.connection.timeout.ms","1000000");
+ props.setProperty("zookeeper.connection.timeout.ms", Integer.toString(KAFKA_ZOOKEEPER_TIMEOUT_MS));
KafkaConfig config = new KafkaConfig(props);
Time mock = new MockTime();
kafkaServer = TestUtils.createServer(config, mock);
@@ -175,7 +179,7 @@ public class KafkaComponent implements InMemoryComponent {
for(Topic topic : getTopics()) {
try {
- createTopic(topic.name, topic.numPartitions, true);
+ createTopic(topic.name, topic.numPartitions, KAFKA_PROPAGATE_TIMEOUT_MS);
} catch (InterruptedException e) {
throw new RuntimeException("Unable to create topic", e);
}
@@ -288,26 +292,26 @@ public class KafkaComponent implements InMemoryComponent {
}
public void createTopic(String name) throws InterruptedException {
- createTopic(name, 1, true);
+ createTopic(name, 1, KAFKA_PROPAGATE_TIMEOUT_MS);
}
- public void waitUntilMetadataIsPropagated(String topic, int numPartitions) {
+ public void waitUntilMetadataIsPropagated(String topic, int numPartitions, long timeOutMS) {
List<KafkaServer> servers = new ArrayList<>();
servers.add(kafkaServer);
for(int part = 0;part < numPartitions;++part) {
- TestUtils.waitUntilMetadataIsPropagated(scala.collection.JavaConversions.asScalaBuffer(servers), topic, part, 5000);
+ TestUtils.waitUntilMetadataIsPropagated(scala.collection.JavaConversions.asScalaBuffer(servers), topic, part, timeOutMS);
}
}
- public void createTopic(String name, int numPartitions, boolean waitUntilMetadataIsPropagated) throws InterruptedException {
+ public void createTopic(String name, int numPartitions, long waitThisLongForMetadataToPropagate) throws InterruptedException {
ZkUtils zkUtils = null;
Level oldLevel = UnitTestHelper.getJavaLoggingLevel();
try {
UnitTestHelper.setJavaLoggingLevel(Level.OFF);
zkUtils = ZkUtils.apply(zookeeperConnectString, 30000, 30000, false);
AdminUtilsWrapper.createTopic(zkUtils, name, numPartitions, 1, new Properties());
- if (waitUntilMetadataIsPropagated) {
- waitUntilMetadataIsPropagated(name, numPartitions);
+ if (waitThisLongForMetadataToPropagate > 0) {
+ waitUntilMetadataIsPropagated(name, numPartitions, waitThisLongForMetadataToPropagate);
}
}catch(TopicExistsException tee) {
}finally {