You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ko...@apache.org on 2018/07/05 14:06:38 UTC

spark git commit: [SPARK-24743][EXAMPLES] Update the JavaDirectKafkaWordCount example to support the new API of kafka

Repository: spark
Updated Branches:
  refs/heads/master 7bd6d5412 -> ac78bcce0


[SPARK-24743][EXAMPLES] Update the JavaDirectKafkaWordCount example to support the new API of kafka

## What changes were proposed in this pull request?

Add some required configs for Kafka consumer in JavaDirectKafkaWordCount class.

## How was this patch tested?

Manual tests on Local mode.

Author: cluo <05...@163.com>

Closes #21717 from cluo512/SPARK-24743-update-JavaDirectKafkaWordCount.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ac78bcce
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ac78bcce
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ac78bcce

Branch: refs/heads/master
Commit: ac78bcce00ff8ec8e5b7335c2807aa0cd0f5406a
Parents: 7bd6d54
Author: cluo <05...@163.com>
Authored: Thu Jul 5 09:06:25 2018 -0500
Committer: cody koeninger <co...@koeninger.org>
Committed: Thu Jul 5 09:06:25 2018 -0500

----------------------------------------------------------------------
 .../streaming/JavaDirectKafkaWordCount.java     | 24 +++++++++++++-------
 1 file changed, 16 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ac78bcce/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
index b6b163f..748bf58 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
@@ -26,7 +26,9 @@ import java.util.regex.Pattern;
 
 import scala.Tuple2;
 
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.serialization.StringDeserializer;
 
 import org.apache.spark.SparkConf;
 import org.apache.spark.streaming.api.java.*;
@@ -37,30 +39,33 @@ import org.apache.spark.streaming.Durations;
 
 /**
  * Consumes messages from one or more topics in Kafka and does wordcount.
- * Usage: JavaDirectKafkaWordCount <brokers> <topics>
+ * Usage: JavaDirectKafkaWordCount <brokers> <groupId> <topics>
  *   <brokers> is a list of one or more Kafka brokers
+ *   <groupId> is a consumer group name to consume from topics
  *   <topics> is a list of one or more kafka topics to consume from
  *
  * Example:
  *    $ bin/run-example streaming.JavaDirectKafkaWordCount broker1-host:port,broker2-host:port \
- *      topic1,topic2
+ *      consumer-group topic1,topic2
  */
 
 public final class JavaDirectKafkaWordCount {
   private static final Pattern SPACE = Pattern.compile(" ");
 
   public static void main(String[] args) throws Exception {
-    if (args.length < 2) {
-      System.err.println("Usage: JavaDirectKafkaWordCount <brokers> <topics>\n" +
-          "  <brokers> is a list of one or more Kafka brokers\n" +
-          "  <topics> is a list of one or more kafka topics to consume from\n\n");
+    if (args.length < 3) {
+      System.err.println("Usage: JavaDirectKafkaWordCount <brokers> <groupId> <topics>\n" +
+                         "  <brokers> is a list of one or more Kafka brokers\n" +
+                         "  <groupId> is a consumer group name to consume from topics\n" +
+                         "  <topics> is a list of one or more kafka topics to consume from\n\n");
       System.exit(1);
     }
 
     StreamingExamples.setStreamingLogLevels();
 
     String brokers = args[0];
-    String topics = args[1];
+    String groupId = args[1];
+    String topics = args[2];
 
     // Create context with a 2 seconds batch interval
     SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount");
@@ -68,7 +73,10 @@ public final class JavaDirectKafkaWordCount {
 
     Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
     Map<String, Object> kafkaParams = new HashMap<>();
-    kafkaParams.put("metadata.broker.list", brokers);
+    kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
+    kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+    kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+    kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
 
     // Create direct kafka stream with brokers and topics
     JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org