You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/08/02 14:07:24 UTC

[apex-malhar] branch master updated: APEXMALHAR-2458 remove hard coded port numbers from Kafka example tests.

This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apex-malhar.git


The following commit(s) were added to refs/heads/master by this push:
     new e4e649f  APEXMALHAR-2458 remove hard coded port numbers from Kafka example tests.
e4e649f is described below

commit e4e649f6e382625d78ea748331fe3240b883f435
Author: Thomas Weise <th...@apache.org>
AuthorDate: Mon Jul 31 09:43:29 2017 -0700

    APEXMALHAR-2458 remove hard coded port numbers from Kafka example tests.
---
 .../apex/examples/kafka/exactlyonceoutput/ApplicationTest.java | 10 ++++++++--
 .../apache/apex/examples/kafka/hdfs2kafka/ApplicationTest.java |  8 ++++++--
 .../apache/apex/examples/kafka/kafka2hdfs/ApplicationTest.java |  7 ++++---
 3 files changed, 18 insertions(+), 7 deletions(-)

diff --git a/examples/kafka/src/test/java/org/apache/apex/examples/kafka/exactlyonceoutput/ApplicationTest.java b/examples/kafka/src/test/java/org/apache/apex/examples/kafka/exactlyonceoutput/ApplicationTest.java
index cc4f63c..c5cfcea 100644
--- a/examples/kafka/src/test/java/org/apache/apex/examples/kafka/exactlyonceoutput/ApplicationTest.java
+++ b/examples/kafka/src/test/java/org/apache/apex/examples/kafka/exactlyonceoutput/ApplicationTest.java
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
 
 import com.datatorrent.api.LocalMode;
 
@@ -52,8 +53,9 @@ public class ApplicationTest
   private static final String directory = "target/exactlyonceoutput";
   private String tuplesUntilKill;
 
-  private static final int zkPort = 2181;
-  private static final int brokerPort = 9092;
+  private final int zkPort = NetUtils.getFreeSocketPort();
+  private final int brokerPort = NetUtils.getFreeSocketPort();
+  private final String broker = "localhost:" + brokerPort;
 
   private static final Logger logger = LoggerFactory.getLogger(ApplicationTest.class);
 
@@ -107,6 +109,10 @@ public class ApplicationTest
     conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties-KafkaExactlyOnceOutput.xml"));
     conf.set("dt.operator.passthrough.prop.directoryPath", directory);
     conf.set("dt.operator.validationToFile.prop.filePath", directory);
+    conf.set("dt.operator.kafkaTopicExactly.prop.clusters", broker);
+    conf.set("dt.operator.kafkaTopicAtLeast.prop.clusters", broker);
+    conf.set("dt.operator.kafkaOutputOperator.prop.properties(bootstrap.servers)", broker);
+    conf.set("dt.operator.kafkaExactlyOnceOutputOperator.prop.properties(bootstrap.servers)", broker);
     tuplesUntilKill = conf.get("dt.operator.passthrough.prop.tuplesUntilKill");
     return conf;
   }
diff --git a/examples/kafka/src/test/java/org/apache/apex/examples/kafka/hdfs2kafka/ApplicationTest.java b/examples/kafka/src/test/java/org/apache/apex/examples/kafka/hdfs2kafka/ApplicationTest.java
index d00236b..2550c6a 100644
--- a/examples/kafka/src/test/java/org/apache/apex/examples/kafka/hdfs2kafka/ApplicationTest.java
+++ b/examples/kafka/src/test/java/org/apache/apex/examples/kafka/hdfs2kafka/ApplicationTest.java
@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
 
 import com.datatorrent.api.LocalMode;
 
@@ -51,8 +52,8 @@ public class ApplicationTest
   private static final String directory = "target/hdfs2kafka";
   private static final String FILE_NAME = "messages.txt";
 
-  private static final int zkPort = 2181;
-  private static final int brokerPort = 9092;
+  private static final int zkPort = NetUtils.getFreeSocketPort();
+  private static final int brokerPort = NetUtils.getFreeSocketPort();
   private static final String BROKER = "localhost:" + brokerPort;
   //private static final String FILE_PATH = FILE_DIR + "/" + FILE_NAME + ".0";     // first part
 
@@ -119,6 +120,9 @@ public class ApplicationTest
     Configuration conf = new Configuration(false);
     conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties-hdfs2kafka.xml"));
     conf.set("dt.operator.lines.prop.directory", directory);
+    conf.set("dt.operator.kafkaOutput.prop.producerProperties",
+        "serializer.class=kafka.serializer.StringEncoder,producer.type=async,metadata.broker.list=" + BROKER);
+
     return conf;
   }
 
diff --git a/examples/kafka/src/test/java/org/apache/apex/examples/kafka/kafka2hdfs/ApplicationTest.java b/examples/kafka/src/test/java/org/apache/apex/examples/kafka/kafka2hdfs/ApplicationTest.java
index be38c53..b5b83b7 100644
--- a/examples/kafka/src/test/java/org/apache/apex/examples/kafka/kafka2hdfs/ApplicationTest.java
+++ b/examples/kafka/src/test/java/org/apache/apex/examples/kafka/kafka2hdfs/ApplicationTest.java
@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.apex.malhar.kafka.AbstractKafkaInputOperator;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
 
 import com.datatorrent.api.LocalMode;
 
@@ -50,11 +51,11 @@ public class ApplicationTest
   private static final Logger LOG = LoggerFactory.getLogger(ApplicationTest.class);
   private static final String TOPIC = "kafka2hdfs";
 
-  private static final int zkPort = 2181;
-  private static final int brokerPort = 9092;
+  private static final int zkPort = NetUtils.getFreeSocketPort();
+  private static final int brokerPort = NetUtils.getFreeSocketPort();
   private static final String BROKER = "localhost:" + brokerPort;
   private static final String FILE_NAME = "test";
-  private static final String FILE_DIR = "/tmp/FromKafka";
+  private static final String FILE_DIR = "./target/tmp/FromKafka";
   private static final String FILE_PATH = FILE_DIR + "/" + FILE_NAME + ".0";     // first part
 
   // test messages

-- 
To stop receiving notification emails like this one, please contact
['"commits@apex.apache.org" <co...@apex.apache.org>'].