You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/08/06 18:11:18 UTC

[GitHub] [kafka] mjsax commented on a change in pull request #9131: KAFKA-10367: Allow running the Streams demo app with a config file

mjsax commented on a change in pull request #9131:
URL: https://github.com/apache/kafka/pull/9131#discussion_r466596054



##########
File path: streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
##########
@@ -47,18 +49,23 @@
     public static final String INPUT_TOPIC = "streams-plaintext-input";
     public static final String OUTPUT_TOPIC = "streams-wordcount-output";
 
-    static Properties getStreamsConfig() {
+    static Properties getStreamsConfig(final String[] args) throws IOException {
         final Properties props = new Properties();
-        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
-        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
-        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        if (args != null && args.length == 1) {
+            try (FileInputStream fis = new FileInputStream(args[0])) {

Review comment:
       I think this must be `final`, too

##########
File path: streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerDemo.java
##########
@@ -120,16 +122,19 @@ public void close() {}
         }
     }
 
-    public static void main(final String[] args) {
+    public static void main(final String[] args) throws IOException {
         final Properties props = new Properties();
-        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount-transformer");
-        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
-        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
-        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        if (args != null && args.length == 1) {
+            props.load(new FileInputStream(args[0]));

Review comment:
       as above

##########
File path: streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountDemoTest.java
##########
@@ -47,11 +48,11 @@
     private TestOutputTopic<String, Long> outputTopic;
 
     @Before
-    public void setup() {
+    public void setup() throws IOException {
         final StreamsBuilder builder = new StreamsBuilder();
         //Create Actual Stream Processing pipeline
         WordCountDemo.createWordCountStream(builder);
-        testDriver = new TopologyTestDriver(builder.build(), WordCountDemo.getStreamsConfig());
+        testDriver = new TopologyTestDriver(builder.build(), WordCountDemo.getStreamsConfig(new String[] {}));

Review comment:
       Nit: We can just pass in `null` ?

##########
File path: streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
##########
@@ -47,18 +49,23 @@
     public static final String INPUT_TOPIC = "streams-plaintext-input";
     public static final String OUTPUT_TOPIC = "streams-wordcount-output";
 
-    static Properties getStreamsConfig() {
+    static Properties getStreamsConfig(final String[] args) throws IOException {
         final Properties props = new Properties();
-        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
-        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
-        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        if (args != null && args.length == 1) {

Review comment:
       For `args.length > 1` should we log a warning that some parameters are ignored and still use the first parameter as file name? -- Just ignoring all arguments silently (if there is more than one) seems not ideal?

##########
File path: streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
##########
@@ -101,16 +103,20 @@ public void close() {}
         }
     }
 
-    public static void main(final String[] args) {
+    public static void main(final String[] args) throws IOException {
         final Properties props = new Properties();
-        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount-processor");
-        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
-        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
-        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        if (args != null && args.length == 1) {
+            props.load(new FileInputStream(args[0]));

Review comment:
       Missing `try` block to close the `FileInputStream` properly




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org