You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/08/06 09:35:12 UTC

[GitHub] [kafka] mimaison opened a new pull request #9131: KAFKA-10367: Allow running the Streams demo app with a config file

mimaison opened a new pull request #9131:
URL: https://github.com/apache/kafka/pull/9131


   Update the 3 WordCount demos to accept a configuration file. I kept the changes to the minimum as the point of these samples is not to add a lot of logic for parsing arguments and handling usage errors.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #9131: KAFKA-10367: Allow running the Streams demo app with a config file

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #9131:
URL: https://github.com/apache/kafka/pull/9131#issuecomment-670039989


   Checkstyle error:
   ```
   > Task :streams:examples:checkstyleMain FAILED
   02:46:32 [ant:checkstyle] [ERROR] /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.13/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java:52:49: Variable 'args' should be declared final. [FinalLocalVariable]
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison merged pull request #9131: KAFKA-10367: Allow running the Streams demo app with a config file

Posted by GitBox <gi...@apache.org>.
mimaison merged pull request #9131:
URL: https://github.com/apache/kafka/pull/9131


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9131: KAFKA-10367: Allow running the Streams demo app with a config file

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9131:
URL: https://github.com/apache/kafka/pull/9131#discussion_r466596054



##########
File path: streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
##########
@@ -47,18 +49,23 @@
     public static final String INPUT_TOPIC = "streams-plaintext-input";
     public static final String OUTPUT_TOPIC = "streams-wordcount-output";
 
-    static Properties getStreamsConfig() {
+    static Properties getStreamsConfig(final String[] args) throws IOException {
         final Properties props = new Properties();
-        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
-        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
-        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        if (args != null && args.length == 1) {
+            try (FileInputStream fis = new FileInputStream(args[0])) {

Review comment:
       I think this must be `final`, too

##########
File path: streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerDemo.java
##########
@@ -120,16 +122,19 @@ public void close() {}
         }
     }
 
-    public static void main(final String[] args) {
+    public static void main(final String[] args) throws IOException {
         final Properties props = new Properties();
-        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount-transformer");
-        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
-        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
-        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        if (args != null && args.length == 1) {
+            props.load(new FileInputStream(args[0]));

Review comment:
       as above

##########
File path: streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountDemoTest.java
##########
@@ -47,11 +48,11 @@
     private TestOutputTopic<String, Long> outputTopic;
 
     @Before
-    public void setup() {
+    public void setup() throws IOException {
         final StreamsBuilder builder = new StreamsBuilder();
         //Create Actual Stream Processing pipeline
         WordCountDemo.createWordCountStream(builder);
-        testDriver = new TopologyTestDriver(builder.build(), WordCountDemo.getStreamsConfig());
+        testDriver = new TopologyTestDriver(builder.build(), WordCountDemo.getStreamsConfig(new String[] {}));

Review comment:
       Nit: We can just pass in `null` ?

##########
File path: streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
##########
@@ -47,18 +49,23 @@
     public static final String INPUT_TOPIC = "streams-plaintext-input";
     public static final String OUTPUT_TOPIC = "streams-wordcount-output";
 
-    static Properties getStreamsConfig() {
+    static Properties getStreamsConfig(final String[] args) throws IOException {
         final Properties props = new Properties();
-        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
-        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
-        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        if (args != null && args.length == 1) {

Review comment:
       For `args.length > 1` should we log a warning that some parameters are ignored and still use the first parameter as file name? -- Just ignoring all arguments silently (if there is more than one) seems not ideal?

##########
File path: streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
##########
@@ -101,16 +103,20 @@ public void close() {}
         }
     }
 
-    public static void main(final String[] args) {
+    public static void main(final String[] args) throws IOException {
         final Properties props = new Properties();
-        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount-processor");
-        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
-        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
-        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        if (args != null && args.length == 1) {
+            props.load(new FileInputStream(args[0]));

Review comment:
       Missing `try` block to close the `FileInputStream` properly




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #9131: KAFKA-10367: Allow running the Streams demo app with a config file

Posted by GitBox <gi...@apache.org>.
mimaison commented on pull request #9131:
URL: https://github.com/apache/kafka/pull/9131#issuecomment-675713485


   @mjsax Can you take another look? Thanks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #9131: KAFKA-10367: Allow running the Streams demo app with a config file

Posted by GitBox <gi...@apache.org>.
mimaison commented on pull request #9131:
URL: https://github.com/apache/kafka/pull/9131#issuecomment-670634953


   Thanks @mjsax, I wasn't expecting a review this quickly! Sorry for not fully checking the changes before opening the PR.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org