You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/08/02 14:07:24 UTC
[apex-malhar] branch master updated: APEXMALHAR-2458 remove hard
coded port numbers from Kafka example tests.
This is an automated email from the ASF dual-hosted git repository.
thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apex-malhar.git
The following commit(s) were added to refs/heads/master by this push:
new e4e649f APEXMALHAR-2458 remove hard coded port numbers from Kafka example tests.
e4e649f is described below
commit e4e649f6e382625d78ea748331fe3240b883f435
Author: Thomas Weise <th...@apache.org>
AuthorDate: Mon Jul 31 09:43:29 2017 -0700
APEXMALHAR-2458 remove hard coded port numbers from Kafka example tests.
---
.../apex/examples/kafka/exactlyonceoutput/ApplicationTest.java | 10 ++++++++--
.../apache/apex/examples/kafka/hdfs2kafka/ApplicationTest.java | 8 ++++++--
.../apache/apex/examples/kafka/kafka2hdfs/ApplicationTest.java | 7 ++++---
3 files changed, 18 insertions(+), 7 deletions(-)
diff --git a/examples/kafka/src/test/java/org/apache/apex/examples/kafka/exactlyonceoutput/ApplicationTest.java b/examples/kafka/src/test/java/org/apache/apex/examples/kafka/exactlyonceoutput/ApplicationTest.java
index cc4f63c..c5cfcea 100644
--- a/examples/kafka/src/test/java/org/apache/apex/examples/kafka/exactlyonceoutput/ApplicationTest.java
+++ b/examples/kafka/src/test/java/org/apache/apex/examples/kafka/exactlyonceoutput/ApplicationTest.java
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
import com.datatorrent.api.LocalMode;
@@ -52,8 +53,9 @@ public class ApplicationTest
private static final String directory = "target/exactlyonceoutput";
private String tuplesUntilKill;
- private static final int zkPort = 2181;
- private static final int brokerPort = 9092;
+ private final int zkPort = NetUtils.getFreeSocketPort();
+ private final int brokerPort = NetUtils.getFreeSocketPort();
+ private final String broker = "localhost:" + brokerPort;
private static final Logger logger = LoggerFactory.getLogger(ApplicationTest.class);
@@ -107,6 +109,10 @@ public class ApplicationTest
conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties-KafkaExactlyOnceOutput.xml"));
conf.set("dt.operator.passthrough.prop.directoryPath", directory);
conf.set("dt.operator.validationToFile.prop.filePath", directory);
+ conf.set("dt.operator.kafkaTopicExactly.prop.clusters", broker);
+ conf.set("dt.operator.kafkaTopicAtLeast.prop.clusters", broker);
+ conf.set("dt.operator.kafkaOutputOperator.prop.properties(bootstrap.servers)", broker);
+ conf.set("dt.operator.kafkaExactlyOnceOutputOperator.prop.properties(bootstrap.servers)", broker);
tuplesUntilKill = conf.get("dt.operator.passthrough.prop.tuplesUntilKill");
return conf;
}
diff --git a/examples/kafka/src/test/java/org/apache/apex/examples/kafka/hdfs2kafka/ApplicationTest.java b/examples/kafka/src/test/java/org/apache/apex/examples/kafka/hdfs2kafka/ApplicationTest.java
index d00236b..2550c6a 100644
--- a/examples/kafka/src/test/java/org/apache/apex/examples/kafka/hdfs2kafka/ApplicationTest.java
+++ b/examples/kafka/src/test/java/org/apache/apex/examples/kafka/hdfs2kafka/ApplicationTest.java
@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
import com.datatorrent.api.LocalMode;
@@ -51,8 +52,8 @@ public class ApplicationTest
private static final String directory = "target/hdfs2kafka";
private static final String FILE_NAME = "messages.txt";
- private static final int zkPort = 2181;
- private static final int brokerPort = 9092;
+ private static final int zkPort = NetUtils.getFreeSocketPort();
+ private static final int brokerPort = NetUtils.getFreeSocketPort();
private static final String BROKER = "localhost:" + brokerPort;
//private static final String FILE_PATH = FILE_DIR + "/" + FILE_NAME + ".0"; // first part
@@ -119,6 +120,9 @@ public class ApplicationTest
Configuration conf = new Configuration(false);
conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties-hdfs2kafka.xml"));
conf.set("dt.operator.lines.prop.directory", directory);
+ conf.set("dt.operator.kafkaOutput.prop.producerProperties",
+ "serializer.class=kafka.serializer.StringEncoder,producer.type=async,metadata.broker.list=" + BROKER);
+
return conf;
}
diff --git a/examples/kafka/src/test/java/org/apache/apex/examples/kafka/kafka2hdfs/ApplicationTest.java b/examples/kafka/src/test/java/org/apache/apex/examples/kafka/kafka2hdfs/ApplicationTest.java
index be38c53..b5b83b7 100644
--- a/examples/kafka/src/test/java/org/apache/apex/examples/kafka/kafka2hdfs/ApplicationTest.java
+++ b/examples/kafka/src/test/java/org/apache/apex/examples/kafka/kafka2hdfs/ApplicationTest.java
@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
import org.apache.apex.malhar.kafka.AbstractKafkaInputOperator;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
import com.datatorrent.api.LocalMode;
@@ -50,11 +51,11 @@ public class ApplicationTest
private static final Logger LOG = LoggerFactory.getLogger(ApplicationTest.class);
private static final String TOPIC = "kafka2hdfs";
- private static final int zkPort = 2181;
- private static final int brokerPort = 9092;
+ private static final int zkPort = NetUtils.getFreeSocketPort();
+ private static final int brokerPort = NetUtils.getFreeSocketPort();
private static final String BROKER = "localhost:" + brokerPort;
private static final String FILE_NAME = "test";
- private static final String FILE_DIR = "/tmp/FromKafka";
+ private static final String FILE_DIR = "./target/tmp/FromKafka";
private static final String FILE_PATH = FILE_DIR + "/" + FILE_NAME + ".0"; // first part
// test messages
--
To stop receiving notification emails like this one, please contact
['"commits@apex.apache.org" <co...@apex.apache.org>'].