You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by "Gutwein, Sebastian" <gu...@mail.hs-ulm.de> on 2017/02/10 10:08:20 UTC

Write JavaDStream to Kafka (how?)

Hi,


I'am new to Spark-Streaming and want to run some end-to-end-tests with Spark and Kafka.

My program is running but at the kafka topic nothing arrives. Can someone please help me?

Where is my mistake, has someone a runnig example of writing a DStream to Kafka 0.10.1.0?


The program looks like follows:

import kafka.Kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.*;
import org.apache.spark.rdd.RDD;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Int;
import scala.Tuple2;

import java.util.*;
import java.util.regex.Pattern;

/**
 * Consumes messages from one or more topics in Kafka and does wordcount.
 *
 * Usage: JavaKafkaWordCount <zkQuorum> <group> <topics> <numThreads>
 *   <zkQuorum> is a list of one or more zookeeper servers that make quorum
 *   <group> is the name of kafka consumer group
 *   <topics> is a list of one or more kafka topics to consume from
 *   <numThreads> is the number of threads the kafka consumer should use
 *
 * To run this example:
 *   `$ bin/run-example org.apache.spark.examples.streaming.JavaKafkaWordCount zoo01,zoo02, \
 *    zoo03 my-consumer-group topic1,topic2 1`
 */

public final class JavaKafkaWordCountTest {
  private static final Pattern SPACE = Pattern.compile(" ");

  private JavaKafkaWordCountTest() {
  }

  public static void main(String[] args) throws Exception {
    if (args.length < 4) {
      System.err.println("Usage: JavaKafkaWordCount <zkQuorum> <group> <topics> <numThreads>");
      System.exit(1);
    }

    SparkConf sparkConf = new SparkConf().setAppName("GutweinKafkaWordCount");
    // Create the context with 2 seconds batch size
    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));

    int numThreads = Integer.parseInt(args[3]);
    Map<String, Integer> topicMap = new HashMap<>();
    String[] topics = args[2].split(",");
    for (String topic: topics) {
      topicMap.put(topic, numThreads);
    }

    final JavaPairReceiverInputDStream<String, String> messages =
            KafkaUtils.createStream(jssc, args[0], args[1], topicMap);

    JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
      @Override
      public String call(Tuple2<String, String> tuple2) {
        return tuple2._2();
      }
    });

    JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
      @Override
      public Iterator<String> call(String x) {
        return Arrays.asList(SPACE.split(x)).iterator();
      }
    });

    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
      new PairFunction<String, String, Integer>() {
        @Override
        public Tuple2<String, Integer> call(String s) {
          return new Tuple2<>(s, 1);
        }
      }).reduceByKey(new Function2<Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer i1, Integer i2) {
          return i1 + i2;
        }
      });

    final KafkaWriter writer = new KafkaWriter("localhost:9081");

    wordCounts.foreachRDD(new VoidFunction<JavaPairRDD<String, Integer>>() {
        @Override
        public void call(JavaPairRDD<String, Integer> stringIntegerJavaPairRDD) throws Exception {
            writer.writeToTopic("output", stringIntegerJavaPairRDD.toString());
        }
    });

    wordCounts.print();
    jssc.start();
    jssc.awaitTermination();
  }

  public static class KafkaWriter {
    Properties props = new Properties();
    KafkaProducer<String, String> producer;

    // Constructor
    KafkaWriter(String bootstrap_server){
      props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_server);
      props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
      props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
      producer = new KafkaProducer<String, String>(props);
    }


    private void writeToTopic(String topicName, String message){
      ProducerRecord<String, String> record = new ProducerRecord<>(topicName, message);
      producer.send(record);

    }

  }

}


Re: Write JavaDStream to Kafka (how?)

Posted by Cody Koeninger <co...@koeninger.org>.
It looks like you're creating a kafka producer on the driver, and
attempting to write the string representation of
stringIntegerJavaPairRDD.  Instead, you probably want to be calling
stringIntegerJavaPairRDD.foreachPartition, so that producing to kafka
is being done on the executor.

Read

https://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd

On Fri, Feb 10, 2017 at 4:08 AM, Gutwein, Sebastian
<gu...@mail.hs-ulm.de> wrote:
> Hi,
>
>
> I'am new to Spark-Streaming and want to run some end-to-end-tests with Spark
> and Kafka.
>
> My program is running but at the kafka topic nothing arrives. Can someone
> please help me?
>
> Where is my mistake, has someone a runnig example of writing a DStream to
> Kafka 0.10.1.0?
>
>
> The program looks like follows:
>
> import kafka.Kafka;
> import org.apache.kafka.clients.producer.KafkaProducer;
> import org.apache.kafka.clients.producer.ProducerConfig;
> import org.apache.kafka.clients.producer.ProducerRecord;
> import org.apache.spark.SparkConf;
> import org.apache.spark.api.java.JavaPairRDD;
> import org.apache.spark.api.java.JavaRDD;
> import org.apache.spark.api.java.function.*;
> import org.apache.spark.rdd.RDD;
> import org.apache.spark.streaming.Duration;
> import org.apache.spark.streaming.api.java.JavaDStream;
> import org.apache.spark.streaming.api.java.JavaPairDStream;
> import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
> import org.apache.spark.streaming.api.java.JavaStreamingContext;
> import org.apache.spark.streaming.kafka.KafkaUtils;
> import scala.Int;
> import scala.Tuple2;
>
> import java.util.*;
> import java.util.regex.Pattern;
>
> /**
>  * Consumes messages from one or more topics in Kafka and does wordcount.
>  *
>  * Usage: JavaKafkaWordCount <zkQuorum> <group> <topics> <numThreads>
>  *   <zkQuorum> is a list of one or more zookeeper servers that make quorum
>  *   <group> is the name of kafka consumer group
>  *   <topics> is a list of one or more kafka topics to consume from
>  *   <numThreads> is the number of threads the kafka consumer should use
>  *
>  * To run this example:
>  *   `$ bin/run-example
> org.apache.spark.examples.streaming.JavaKafkaWordCount zoo01,zoo02, \
>  *    zoo03 my-consumer-group topic1,topic2 1`
>  */
>
> public final class JavaKafkaWordCountTest {
>   private static final Pattern SPACE = Pattern.compile(" ");
>
>   private JavaKafkaWordCountTest() {
>   }
>
>   public static void main(String[] args) throws Exception {
>     if (args.length < 4) {
>       System.err.println("Usage: JavaKafkaWordCount <zkQuorum> <group>
> <topics> <numThreads>");
>       System.exit(1);
>     }
>
>     SparkConf sparkConf = new
> SparkConf().setAppName("GutweinKafkaWordCount");
>     // Create the context with 2 seconds batch size
>     JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new
> Duration(2000));
>
>     int numThreads = Integer.parseInt(args[3]);
>     Map<String, Integer> topicMap = new HashMap<>();
>     String[] topics = args[2].split(",");
>     for (String topic: topics) {
>       topicMap.put(topic, numThreads);
>     }
>
>     final JavaPairReceiverInputDStream<String, String> messages =
>             KafkaUtils.createStream(jssc, args[0], args[1], topicMap);
>
>     JavaDStream<String> lines = messages.map(new Function<Tuple2<String,
> String>, String>() {
>       @Override
>       public String call(Tuple2<String, String> tuple2) {
>         return tuple2._2();
>       }
>     });
>
>     JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String,
> String>() {
>       @Override
>       public Iterator<String> call(String x) {
>         return Arrays.asList(SPACE.split(x)).iterator();
>       }
>     });
>
>     JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
>       new PairFunction<String, String, Integer>() {
>         @Override
>         public Tuple2<String, Integer> call(String s) {
>           return new Tuple2<>(s, 1);
>         }
>       }).reduceByKey(new Function2<Integer, Integer, Integer>() {
>         @Override
>         public Integer call(Integer i1, Integer i2) {
>           return i1 + i2;
>         }
>       });
>
>     final KafkaWriter writer = new KafkaWriter("localhost:9081");
>
>     wordCounts.foreachRDD(new VoidFunction<JavaPairRDD<String, Integer>>() {
>         @Override
>         public void call(JavaPairRDD<String, Integer>
> stringIntegerJavaPairRDD) throws Exception {
>             writer.writeToTopic("output",
> stringIntegerJavaPairRDD.toString());
>         }
>     });
>
>     wordCounts.print();
>     jssc.start();
>     jssc.awaitTermination();
>   }
>
>   public static class KafkaWriter {
>     Properties props = new Properties();
>     KafkaProducer<String, String> producer;
>
>     // Constructor
>     KafkaWriter(String bootstrap_server){
>       props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_server);
>       props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
> "org.apache.kafka.common.serialization.StringSerializer");
>       props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
> "org.apache.kafka.common.serialization.StringSerializer");
>       producer = new KafkaProducer<String, String>(props);
>     }
>
>
>     private void writeToTopic(String topicName, String message){
>       ProducerRecord<String, String> record = new
> ProducerRecord<>(topicName, message);
>       producer.send(record);
>
>     }
>
>   }
>
> }
>
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org