You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by mm...@apache.org on 2019/09/04 17:47:45 UTC

[metron] branch master updated: METRON-2227 Increase Kafka test harness timeout (tigerquoll via mmiklavc) closes apache/metron#1493

This is an automated email from the ASF dual-hosted git repository.

mmiklavcic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/metron.git


The following commit(s) were added to refs/heads/master by this push:
     new 67fa5a4  METRON-2227 Increase Kafka test harness timeout (tigerquoll via mmiklavc) closes apache/metron#1493
67fa5a4 is described below

commit 67fa5a403b01d0f7c8607c06e63f9d06f8b8cbc1
Author: tigerquoll <ti...@outlook.com>
AuthorDate: Wed Sep 4 11:47:04 2019 -0600

    METRON-2227 Increase Kafka test harness timeout (tigerquoll via mmiklavc) closes apache/metron#1493
---
 .../integration/components/KafkaComponent.java     | 22 +++++++++++++---------
 1 file changed, 13 insertions(+), 9 deletions(-)

diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaComponent.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaComponent.java
index 08910be..0fa414b 100644
--- a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaComponent.java
+++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaComponent.java
@@ -65,6 +65,10 @@ import org.slf4j.LoggerFactory;
 public class KafkaComponent implements InMemoryComponent {
 
   protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  public static final long KAFKA_PROPAGATE_TIMEOUT_MS = 10000l;
+  public static final int ZK_SESSION_TIMEOUT_MS = 30000;
+  public static final int ZK_CONNECTION_TIMEOUT_MS = 30000;
+  public static final int KAFKA_ZOOKEEPER_TIMEOUT_MS = 1000000;
 
   public static class Topic {
     public int numPartitions;
@@ -159,11 +163,11 @@ public class KafkaComponent implements InMemoryComponent {
     // setup Zookeeper
     zookeeperConnectString = topologyProperties.getProperty(ZKServerComponent.ZOOKEEPER_PROPERTY);
 
-    zkClient = new ZkClient(zookeeperConnectString, 30000, 30000, ZKStringSerializer$.MODULE$);
+    zkClient = new ZkClient(zookeeperConnectString, ZK_SESSION_TIMEOUT_MS, ZK_CONNECTION_TIMEOUT_MS, ZKStringSerializer$.MODULE$);
 
     // setup Broker
     Properties props = TestUtilsWrapper.createBrokerConfig(0, zookeeperConnectString, brokerPort);
-    props.setProperty("zookeeper.connection.timeout.ms","1000000");
+    props.setProperty("zookeeper.connection.timeout.ms", Integer.toString(KAFKA_ZOOKEEPER_TIMEOUT_MS));
     KafkaConfig config = new KafkaConfig(props);
     Time mock = new MockTime();
     kafkaServer = TestUtils.createServer(config, mock);
@@ -175,7 +179,7 @@ public class KafkaComponent implements InMemoryComponent {
 
     for(Topic topic : getTopics()) {
       try {
-        createTopic(topic.name, topic.numPartitions, true);
+        createTopic(topic.name, topic.numPartitions, KAFKA_PROPAGATE_TIMEOUT_MS);
       } catch (InterruptedException e) {
         throw new RuntimeException("Unable to create topic", e);
       }
@@ -288,26 +292,26 @@ public class KafkaComponent implements InMemoryComponent {
   }
 
   public void createTopic(String name) throws InterruptedException {
-    createTopic(name, 1, true);
+    createTopic(name, 1, KAFKA_PROPAGATE_TIMEOUT_MS);
   }
 
-  public void waitUntilMetadataIsPropagated(String topic, int numPartitions) {
+  public void waitUntilMetadataIsPropagated(String topic, int numPartitions, long timeOutMS) {
     List<KafkaServer> servers = new ArrayList<>();
     servers.add(kafkaServer);
     for(int part = 0;part < numPartitions;++part) {
-      TestUtils.waitUntilMetadataIsPropagated(scala.collection.JavaConversions.asScalaBuffer(servers), topic, part, 5000);
+      TestUtils.waitUntilMetadataIsPropagated(scala.collection.JavaConversions.asScalaBuffer(servers), topic, part, timeOutMS);
     }
   }
 
-  public void createTopic(String name, int numPartitions, boolean waitUntilMetadataIsPropagated) throws InterruptedException {
+  public void createTopic(String name, int numPartitions, long waitThisLongForMetadataToPropagate) throws InterruptedException {
     ZkUtils zkUtils = null;
     Level oldLevel = UnitTestHelper.getJavaLoggingLevel();
     try {
       UnitTestHelper.setJavaLoggingLevel(Level.OFF);
       zkUtils = ZkUtils.apply(zookeeperConnectString, 30000, 30000, false);
       AdminUtilsWrapper.createTopic(zkUtils, name, numPartitions, 1, new Properties());
-      if (waitUntilMetadataIsPropagated) {
-        waitUntilMetadataIsPropagated(name, numPartitions);
+      if (waitThisLongForMetadataToPropagate > 0) {
+        waitUntilMetadataIsPropagated(name, numPartitions, waitThisLongForMetadataToPropagate);
       }
     }catch(TopicExistsException tee) {
     }finally {