You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Ratika Prasad <rp...@couponsinc.com> on 2015/09/27 06:50:22 UTC

Spark-Kafka Connector issue

Hi All,

I am trying out the spark streaming and reading the messages from kafka topics which later would be created into streams as below...I have the kafka setup on a vm and topics created however when I try to run the program below from my spark vm as below I get an error even though the kafka server and zookeeper are up and running

./bin/spark-submit --class org.stream.processing.JavaKafkaStreamEventProcessing --master local spark-stream-processing-0.0.1-SNAPSHOT-jar-with-dependencies.jar 172.28.161.32:2181 redemption_inbound

Exception in thread "main" org.apache.spark.SparkException: java.io.EOFException: Received -1 when reading from channel, socket has likely been closed.
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
        at scala.util.Either.fold(Either.scala:97)
        at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
        at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:422)
        at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:532)
        at org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
        at org.stream.processing.JavaKafkaStreamEventProcessing.main(JavaKafkaStreamEventProcessing.java:52)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Program

public static void main(String[] args) {
    if (args.length < 2) {
      System.err.println("Usage: DirectKafkaWordCount <brokers> <topics>\n" +
          "  <brokers> is a list of one or more Kafka brokers\n" +
          "  <topics> is a list of one or more kafka topics to consume from\n\n");
      System.exit(1);
    }

    String brokers = args[0];
    String topics = args[1];

    // Create context with 2 second batch interval
    SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaStreamEventProcessing");
    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));

    HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics.split(",")));
    HashMap<String, String> kafkaParams = new HashMap<String, String>();
    kafkaParams.put("metadata.broker.list", brokers);

    // Create direct kafka stream with brokers and topics
    JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
        jssc,
        String.class,
        String.class,
        StringDecoder.class,
        StringDecoder.class,
        kafkaParams,
        topicsSet
    );

    // Get the lines, split them into words, count the words and print
    JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
      public String call(Tuple2<String, String> tuple2) {
        return tuple2._2();
      }
    });
    JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
      public Iterable<String> call(String x) {
        return Lists.newArrayList(SPACE.split(x));
      }
    });
    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
      new PairFunction<String, String, Integer>() {
        public Tuple2<String, Integer> call(String s) {
          return new Tuple2<String, Integer>(s, 1);
        }
      }).reduceByKey(
        new Function2<Integer, Integer, Integer>() {
        public Integer call(Integer i1, Integer i2) {
          return i1 + i2;
        }
      });
    wordCounts.print();
    System.out.println("Word Counts are : " + wordCounts.toString());

    // Start the computation
    jssc.start();
    jssc.awaitTermination();
  }
}

Re: Spark-Kafka Connector issue

Posted by Cody Koeninger <co...@koeninger.org>.
Show the output of bin/kafka-topics.sh --list.  Show the actual code with
the topic name hardcoded in the set, not loaded from an external file you
didn't show.  Show the full stacktrace you're getting.

On Mon, Sep 28, 2015 at 10:03 PM, Ratika Prasad <rp...@couponsinc.com>
wrote:

> Yes the queues are created and gets listed as well and I have posted few
> Msges also which I am able to read using Kafka-consumer.sh --from-beginning
> how spark fails with No leader offset for Set.
>
> Tried changing the offset.storage to Kafka from zookeeper.
>
> Kindly help
>
> Sent from Outlook <http://taps.io/outlookmobile>
>
> _____________________________
> From: Cody Koeninger <co...@koeninger.org>
> Sent: Tuesday, September 29, 2015 12:33 am
> Subject: Re: Spark-Kafka Connector issue
> To: Ratika Prasad <rp...@couponsinc.com>
> Cc: <us...@spark.apache.org>
>
>
>
> Did you actually create TestTopic?  See if it shows up using
> bin/kafka-topics.sh --list, and if not, create it using bin/kafka-topics.sh
> --create
>
> On Mon, Sep 28, 2015 at 1:20 PM, Ratika Prasad <rp...@couponsinc.com>
> wrote:
>
>> Thanks for your reply.
>>
>>
>>
>> I invoked my program with the broker ip and host and it triggered as
>> expected but I see the below error
>>
>>
>>
>> ./bin/spark-submit --class
>> org.stream.processing.JavaKafkaStreamEventProcessing --master local
>> spark-stream-processing-0.0.1-SNAPSHOT-jar-with-dependencies.jar
>> 172.28.161.32:9092 TestTopic
>>
>> 15/09/28 17:45:09 WARN NativeCodeLoader: Unable to load native-hadoop
>> library for your platform... using builtin-java classes where applicable
>>
>> 15/09/28 17:45:11 WARN StreamingContext: spark.master should be set as
>> local[n], n > 1 in local mode if you have receivers to get data, otherwise
>> Spark jobs will not get resources to process the received data.
>>
>> Exception in thread "main" org.apache.spark.SparkException:
>> java.nio.channels.ClosedChannelException
>>
>> org.apache.spark.SparkException: Couldn't find leader offsets for Set
>> ([TestTopic,0])
>>
>>         at
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>>
>>         at
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>>
>>         at scala.util.Either.fold(Either.scala:97)
>>
>>         at
>> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
>>
>>         at
>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:422)
>>
>>         at
>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:532)
>>
>>         at
>> org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
>>
>>         at
>> org.stream.processing.JavaKafkaStreamEventProcessing.main(JavaKafkaStreamEventProcessing.java:52)
>>
>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>
>>         at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>
>>         at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>
>>         at java.lang.reflect.Method.invoke(Method.java:497)
>>
>>         at
>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
>>
>>         at
>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
>>
>>         at
>> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
>>
>>         at
>> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
>>
>>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>
>>
>>
>> Whene I ran the below to check the offsets I get this
>>
>>
>>
>> bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic
>> TestTopic --group test-consumer-group --zookeeper localhost:2181
>>
>> Exiting due to: org.apache.zookeeper.KeeperException$NoNodeException:
>> KeeperErrorCode = NoNode for
>> /consumers/test-consumer-group/offsets/TestTopic /0.
>>
>>
>>
>> Also I just added this below configs to my
>> kafaka/config/consumer.properties and restarted kafka
>>
>>
>>
>> auto.offset.reset=smallest
>>
>> offsets.storage=zookeeper
>>
>> offsets.channel.backoff.ms=1000
>>
>> offsets.channel.socket.timeout.ms=10000
>>
>> offsets.commit.max.retries=5
>>
>> dual.commit.enabled=true
>>
>>
>>
>> *From:* Cody Koeninger [mailto:cody@koeninger.org]
>> *Sent:* Monday, September 28, 2015 7:56 PM
>> *To:* Ratika Prasad <rp...@couponsinc.com>
>> *Cc:* dev@spark.apache.org
>> *Subject:* Re: Spark-Kafka Connector issue
>>
>>
>>
>> This is a user list question not a dev list question.
>>
>>
>>
>> Looks like your driver is having trouble communicating to the kafka
>> brokers.  Make sure the broker host and port is available from the driver
>> host (using nc or telnet); make sure that you're providing the _broker_
>> host and port to createDirectStream, not the zookeeper host; make sure the
>> topics in question actually exist on kafka and the names match what you're
>> providing to createDirectStream.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On Sat, Sep 26, 2015 at 11:50 PM, Ratika Prasad <rp...@couponsinc.com>
>> wrote:
>>
>> Hi All,
>>
>>
>>
>> I am trying out the spark streaming and reading the messages from kafka
>> topics which later would be created into streams as below…I have the kafka
>> setup on a vm and topics created however when I try to run the program
>> below from my spark vm as below I get an error even though the kafka server
>> and zookeeper are up and running
>>
>>
>>
>> ./bin/spark-submit --class
>> org.stream.processing.JavaKafkaStreamEventProcessing --master local
>> spark-stream-processing-0.0.1-SNAPSHOT-jar-with-dependencies.jar
>> 172.28.161.32:2181 redemption_inbound
>>
>>
>>
>> Exception in thread "main" org.apache.spark.SparkException:
>> java.io.EOFException: Received -1 when reading from channel, socket has
>> likely been closed.
>>
>>         at
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>>
>>         at
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>>
>>         at scala.util.Either.fold(Either.scala:97)
>>
>>         at
>> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
>>
>>         at
>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:422)
>>
>>         at
>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:532)
>>
>>         at
>> org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
>>
>>         at
>> org.stream.processing.JavaKafkaStreamEventProcessing.main(JavaKafkaStreamEventProcessing.java:52)
>>
>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>
>>         at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>
>>         at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>
>>         at java.lang.reflect.Method.invoke(Method.java:497)
>>
>>         at
>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
>>
>>         at
>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
>>
>>         at
>> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
>>
>>         at
>> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
>>
>>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>
>>
>>
>> Program
>>
>>
>>
>> *public* *static* *void* main(String[] args) {
>>
>>     *if* (args.length < 2) {
>>
>>       System.*err*.println("Usage: DirectKafkaWordCount <brokers>
>> <topics> " +
>>
>>           "  <brokers> is a list of one or more Kafka brokers " +
>>
>>           "  <topics> is a list of one or more kafka topics to consume
>> from ");
>>
>>       System.*exit*(1);
>>
>>     }
>>
>>
>>
>>     String brokers = args[0];
>>
>>     String topics = args[1];
>>
>>
>>
>>     // Create context with 2 second batch interval
>>
>>     SparkConf sparkConf = *new* SparkConf().setAppName(
>> "JavaKafkaStreamEventProcessing");
>>
>>     JavaStreamingContext jssc = *new* JavaStreamingContext(sparkConf,
>> Durations.*seconds*(2));
>>
>>
>>
>>     HashSet<String> topicsSet = *new* HashSet<String>(Arrays.*asList*
>> (topics.split(",")));
>>
>>     HashMap<String, String> kafkaParams = *new* HashMap<String,
>> String>();
>>
>>     kafkaParams.put("metadata.broker.list", brokers);
>>
>>
>>
>>     // Create direct *kafka* stream with brokers and topics
>>
>>     JavaPairInputDStream<String, String> messages = KafkaUtils.
>> *createDirectStream*(
>>
>>         jssc,
>>
>>         String.*class*,
>>
>>         String.*class*,
>>
>>         StringDecoder.*class*,
>>
>>         StringDecoder.*class*,
>>
>>         kafkaParams,
>>
>>         topicsSet
>>
>>     );
>>
>>
>>
>>     // Get the lines, split them into words, count the words and print
>>
>>     JavaDStream<String> lines = messages.map(*new* *Function<Tuple2<String,
>> String>, String>()* {
>>
>>       *public* String call(Tuple2<String, String> tuple2) {
>>
>>         *return* tuple2._2();
>>
>>       }
>>
>>     });
>>
>>     JavaDStream<String> words = lines.flatMap(*new* *FlatMapFunction<String,
>> String>()* {
>>
>>       *public* Iterable<String> call(String x) {
>>
>>         *return* Lists.*newArrayList*(*SPACE*.split(x));
>>
>>       }
>>
>>     });
>>
>>     JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
>>
>>       *new* *PairFunction<String, String, Integer>()* {
>>
>>         *public* Tuple2<String, Integer> call(String s) {
>>
>>           *return* *new* Tuple2<String, Integer>(s, 1);
>>
>>         }
>>
>>       }).reduceByKey(
>>
>>         *new* *Function2<Integer, Integer, Integer>()* {
>>
>>         *public* Integer call(Integer i1, Integer i2) {
>>
>>           *return* i1 + i2;
>>
>>         }
>>
>>       });
>>
>>     wordCounts.print();
>>
>>     System.*out*.println("Word Counts are : " + wordCounts.toString());
>>
>>
>>
>>     // Start the computation
>>
>>     jssc.start();
>>
>>     jssc.awaitTermination();
>>
>>   }
>>
>> }
>>
>>
>>
>
>
>
>

RE: Spark-Kafka Connector issue

Posted by Ratika Prasad <rp...@couponsinc.com>.
Thanks for your reply.

I invoked my program with the broker ip and host and it triggered as expected but I see the below error

./bin/spark-submit --class org.stream.processing.JavaKafkaStreamEventProcessing --master local spark-stream-processing-0.0.1-SNAPSHOT-jar-with-dependencies.jar 172.28.161.32:9092 TestTopic
15/09/28 17:45:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/09/28 17:45:11 WARN StreamingContext: spark.master should be set as local[n], n > 1 in local mode if you have receivers to get data, otherwise Spark jobs will not get resources to process the received data.
Exception in thread "main" org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
org.apache.spark.SparkException: Couldn't find leader offsets for Set([TestTopic,0])
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
        at scala.util.Either.fold(Either.scala:97)
        at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
        at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:422)
        at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:532)
        at org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
        at org.stream.processing.JavaKafkaStreamEventProcessing.main(JavaKafkaStreamEventProcessing.java:52)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Whene I ran the below to check the offsets I get this

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic TestTopic --group test-consumer-group --zookeeper localhost:2181
Exiting due to: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /consumers/test-consumer-group/offsets/TestTopic /0.

Also I just added this below configs to my kafaka/config/consumer.properties and restarted kafka

auto.offset.reset=smallest
offsets.storage=zookeeper
offsets.channel.backoff.ms=1000
offsets.channel.socket.timeout.ms=10000
offsets.commit.max.retries=5
dual.commit.enabled=true

From: Cody Koeninger [mailto:cody@koeninger.org]
Sent: Monday, September 28, 2015 7:56 PM
To: Ratika Prasad <rp...@couponsinc.com>
Cc: dev@spark.apache.org
Subject: Re: Spark-Kafka Connector issue

This is a user list question not a dev list question.

Looks like your driver is having trouble communicating to the kafka brokers.  Make sure the broker host and port is available from the driver host (using nc or telnet); make sure that you're providing the _broker_ host and port to createDirectStream, not the zookeeper host; make sure the topics in question actually exist on kafka and the names match what you're providing to createDirectStream.





On Sat, Sep 26, 2015 at 11:50 PM, Ratika Prasad <rp...@couponsinc.com>> wrote:
Hi All,

I am trying out the spark streaming and reading the messages from kafka topics which later would be created into streams as below…I have the kafka setup on a vm and topics created however when I try to run the program below from my spark vm as below I get an error even though the kafka server and zookeeper are up and running

./bin/spark-submit --class org.stream.processing.JavaKafkaStreamEventProcessing --master local spark-stream-processing-0.0.1-SNAPSHOT-jar-with-dependencies.jar 172.28.161.32:2181<http://172.28.161.32:2181> redemption_inbound

Exception in thread "main" org.apache.spark.SparkException: java.io.EOFException: Received -1 when reading from channel, socket has likely been closed.
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
        at scala.util.Either.fold(Either.scala:97)
        at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
        at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:422)
        at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:532)
        at org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
        at org.stream.processing.JavaKafkaStreamEventProcessing.main(JavaKafkaStreamEventProcessing.java:52)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Program

public static void main(String[] args) {
    if (args.length < 2) {
      System.err.println("Usage: DirectKafkaWordCount <brokers> <topics>\n" +
          "  <brokers> is a list of one or more Kafka brokers\n" +
          "  <topics> is a list of one or more kafka topics to consume from\n\n");
      System.exit(1);
    }

    String brokers = args[0];
    String topics = args[1];

    // Create context with 2 second batch interval
    SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaStreamEventProcessing");
    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));

    HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics.split(",")));
    HashMap<String, String> kafkaParams = new HashMap<String, String>();
    kafkaParams.put("metadata.broker.list", brokers);

    // Create direct kafka stream with brokers and topics
    JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
        jssc,
        String.class,
        String.class,
        StringDecoder.class,
        StringDecoder.class,
        kafkaParams,
        topicsSet
    );

    // Get the lines, split them into words, count the words and print
    JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
      public String call(Tuple2<String, String> tuple2) {
        return tuple2._2();
      }
    });
    JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
      public Iterable<String> call(String x) {
        return Lists.newArrayList(SPACE.split(x));
      }
    });
    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
      new PairFunction<String, String, Integer>() {
        public Tuple2<String, Integer> call(String s) {
          return new Tuple2<String, Integer>(s, 1);
        }
      }).reduceByKey(
        new Function2<Integer, Integer, Integer>() {
        public Integer call(Integer i1, Integer i2) {
          return i1 + i2;
        }
      });
    wordCounts.print();
    System.out.println("Word Counts are : " + wordCounts.toString());

    // Start the computation
    jssc.start();
    jssc.awaitTermination();
  }
}


Re: Spark-Kafka Connector issue

Posted by Cody Koeninger <co...@koeninger.org>.
This is a user list question not a dev list question.

Looks like your driver is having trouble communicating to the kafka
brokers.  Make sure the broker host and port is available from the driver
host (using nc or telnet); make sure that you're providing the _broker_
host and port to createDirectStream, not the zookeeper host; make sure the
topics in question actually exist on kafka and the names match what you're
providing to createDirectStream.





On Sat, Sep 26, 2015 at 11:50 PM, Ratika Prasad <rp...@couponsinc.com>
wrote:

> Hi All,
>
>
>
> I am trying out the spark streaming and reading the messages from kafka
> topics which later would be created into streams as below…I have the kafka
> setup on a vm and topics created however when I try to run the program
> below from my spark vm as below I get an error even though the kafka server
> and zookeeper are up and running
>
>
>
> ./bin/spark-submit --class
> org.stream.processing.JavaKafkaStreamEventProcessing --master local
> spark-stream-processing-0.0.1-SNAPSHOT-jar-with-dependencies.jar
> 172.28.161.32:2181 redemption_inbound
>
>
>
> Exception in thread "main" org.apache.spark.SparkException:
> java.io.EOFException: Received -1 when reading from channel, socket has
> likely been closed.
>
>         at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>
>         at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>
>         at scala.util.Either.fold(Either.scala:97)
>
>         at
> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
>
>         at
> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:422)
>
>         at
> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:532)
>
>         at
> org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
>
>         at
> org.stream.processing.JavaKafkaStreamEventProcessing.main(JavaKafkaStreamEventProcessing.java:52)
>
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
>         at java.lang.reflect.Method.invoke(Method.java:497)
>
>         at
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
>
>         at
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
>
>         at
> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
>
>         at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
>
>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
>
>
> Program
>
>
>
> *public* *static* *void* main(String[] args) {
>
>     *if* (args.length < 2) {
>
>       System.*err*.println("Usage: DirectKafkaWordCount <brokers>
> <topics>\n" +
>
>           "  <brokers> is a list of one or more Kafka brokers\n" +
>
>           "  <topics> is a list of one or more kafka topics to consume
> from\n\n");
>
>       System.*exit*(1);
>
>     }
>
>
>
>     String brokers = args[0];
>
>     String topics = args[1];
>
>
>
>     // Create context with 2 second batch interval
>
>     SparkConf sparkConf = *new* SparkConf().setAppName(
> "JavaKafkaStreamEventProcessing");
>
>     JavaStreamingContext jssc = *new* JavaStreamingContext(sparkConf,
> Durations.*seconds*(2));
>
>
>
>     HashSet<String> topicsSet = *new* HashSet<String>(Arrays.*asList*
> (topics.split(",")));
>
>     HashMap<String, String> kafkaParams = *new* HashMap<String, String>();
>
>     kafkaParams.put("metadata.broker.list", brokers);
>
>
>
>     // Create direct *kafka* stream with brokers and topics
>
>     JavaPairInputDStream<String, String> messages = KafkaUtils.
> *createDirectStream*(
>
>         jssc,
>
>         String.*class*,
>
>         String.*class*,
>
>         StringDecoder.*class*,
>
>         StringDecoder.*class*,
>
>         kafkaParams,
>
>         topicsSet
>
>     );
>
>
>
>     // Get the lines, split them into words, count the words and print
>
>     JavaDStream<String> lines = messages.map(*new* *Function<Tuple2<String,
> String>, String>()* {
>
>       *public* String call(Tuple2<String, String> tuple2) {
>
>         *return* tuple2._2();
>
>       }
>
>     });
>
>     JavaDStream<String> words = lines.flatMap(*new* *FlatMapFunction<String,
> String>()* {
>
>       *public* Iterable<String> call(String x) {
>
>         *return* Lists.*newArrayList*(*SPACE*.split(x));
>
>       }
>
>     });
>
>     JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
>
>       *new* *PairFunction<String, String, Integer>()* {
>
>         *public* Tuple2<String, Integer> call(String s) {
>
>           *return* *new* Tuple2<String, Integer>(s, 1);
>
>         }
>
>       }).reduceByKey(
>
>         *new* *Function2<Integer, Integer, Integer>()* {
>
>         *public* Integer call(Integer i1, Integer i2) {
>
>           *return* i1 + i2;
>
>         }
>
>       });
>
>     wordCounts.print();
>
>     System.*out*.println("Word Counts are : " + wordCounts.toString());
>
>
>
>     // Start the computation
>
>     jssc.start();
>
>     jssc.awaitTermination();
>
>   }
>
> }
>