You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ko...@apache.org on 2018/07/05 14:06:38 UTC
spark git commit: [SPARK-24743][EXAMPLES] Update the
JavaDirectKafkaWordCount example to support the new API of kafka
Repository: spark
Updated Branches:
refs/heads/master 7bd6d5412 -> ac78bcce0
[SPARK-24743][EXAMPLES] Update the JavaDirectKafkaWordCount example to support the new API of kafka
## What changes were proposed in this pull request?
Add some required configs for Kafka consumer in JavaDirectKafkaWordCount class.
## How was this patch tested?
Manual tests on Local mode.
Author: cluo <05...@163.com>
Closes #21717 from cluo512/SPARK-24743-update-JavaDirectKafkaWordCount.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ac78bcce
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ac78bcce
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ac78bcce
Branch: refs/heads/master
Commit: ac78bcce00ff8ec8e5b7335c2807aa0cd0f5406a
Parents: 7bd6d54
Author: cluo <05...@163.com>
Authored: Thu Jul 5 09:06:25 2018 -0500
Committer: cody koeninger <co...@koeninger.org>
Committed: Thu Jul 5 09:06:25 2018 -0500
----------------------------------------------------------------------
.../streaming/JavaDirectKafkaWordCount.java | 24 +++++++++++++-------
1 file changed, 16 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/ac78bcce/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
index b6b163f..748bf58 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
@@ -26,7 +26,9 @@ import java.util.regex.Pattern;
import scala.Tuple2;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.api.java.*;
@@ -37,30 +39,33 @@ import org.apache.spark.streaming.Durations;
/**
* Consumes messages from one or more topics in Kafka and does wordcount.
- * Usage: JavaDirectKafkaWordCount <brokers> <topics>
+ * Usage: JavaDirectKafkaWordCount <brokers> <groupId> <topics>
* <brokers> is a list of one or more Kafka brokers
+ * <groupId> is a consumer group name to consume from topics
* <topics> is a list of one or more kafka topics to consume from
*
* Example:
* $ bin/run-example streaming.JavaDirectKafkaWordCount broker1-host:port,broker2-host:port \
- * topic1,topic2
+ * consumer-group topic1,topic2
*/
public final class JavaDirectKafkaWordCount {
private static final Pattern SPACE = Pattern.compile(" ");
public static void main(String[] args) throws Exception {
- if (args.length < 2) {
- System.err.println("Usage: JavaDirectKafkaWordCount <brokers> <topics>\n" +
- " <brokers> is a list of one or more Kafka brokers\n" +
- " <topics> is a list of one or more kafka topics to consume from\n\n");
+ if (args.length < 3) {
+ System.err.println("Usage: JavaDirectKafkaWordCount <brokers> <groupId> <topics>\n" +
+ " <brokers> is a list of one or more Kafka brokers\n" +
+ " <groupId> is a consumer group name to consume from topics\n" +
+ " <topics> is a list of one or more kafka topics to consume from\n\n");
System.exit(1);
}
StreamingExamples.setStreamingLogLevels();
String brokers = args[0];
- String topics = args[1];
+ String groupId = args[1];
+ String topics = args[2];
// Create context with a 2 seconds batch interval
SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount");
@@ -68,7 +73,10 @@ public final class JavaDirectKafkaWordCount {
Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
Map<String, Object> kafkaParams = new HashMap<>();
- kafkaParams.put("metadata.broker.list", brokers);
+ kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
+ kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+ kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// Create direct kafka stream with brokers and topics
JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org