You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Hafsa Asif <ha...@matchinguu.com> on 2015/07/21 17:23:16 UTC

Convert Simple Kafka Consumer to standalone Spark JavaStream Consumer

Hi, I have  a simple High level Kafka Consumer like :
package matchinguu.kafka.consumer;


import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

import java.util.*;

public class SimpleHLConsumer {

    private final ConsumerConnector consumer;
    private final String topic;

    public SimpleHLConsumer(String zookeeper, String groupId, String topic)
{
        Properties props = new Properties();
        props.put("zookeeper.connect", zookeeper);
        props.put("group.id", groupId);
        props.put("zookeeper.session.timeout.ms", "500");
        props.put("zookeeper.sync.time.ms", "250");
        props.put("auto.commit.interval.ms", "1000");


        consumer = Consumer.createJavaConsumerConnector(new
ConsumerConfig(props));
        this.topic = topic;
    }

    public void testConsumer() {
        Map<String, Integer> topicCount = new HashMap<String, Integer>();
        topicCount.put(topic, 1);

        Map<String, List&lt;KafkaStream&lt;byte[], byte[]>>> consumerStreams
= consumer.createMessageStreams(topicCount);
        List<KafkaStream&lt;byte[], byte[]>> streams =
consumerStreams.get(topic);
        for (final KafkaStream stream : streams) {

            ConsumerIterator<byte[], byte[]> it = stream.iterator();
            while (it.hasNext()) {
                System.out.println("------------");
                System.out.println("Message from Single Topic: " + new
String(it.next().message()));
            }
        }
        if (consumer != null) {
            System.out.println("Shutdown Happens");
            consumer.shutdown();
        }

    }

    public static void main(String[] args) {
        System.out.println("Consumer is now reading messages from
producer");
        //String topic = args[0];
        String topic = "test";
        SimpleHLConsumer simpleHLConsumer = new
SimpleHLConsumer("localhost:2181", "testgroup", topic);
        simpleHLConsumer.testConsumer();
   }

}

I want to get my messages through Spark Java Streaming with Kafka
integration. Can anyone help me to reform this code so that I can get same
output with Spark Kafka integration.



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Convert-Simple-Kafka-Consumer-to-standalone-Spark-JavaStream-Consumer-tp23930.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Convert Simple Kafka Consumer to standalone Spark JavaStream Consumer

Posted by Tathagata Das <td...@databricks.com>.
>From what I understand about your code, it is getting data from different
partitions of a topic - get all data from partition 1, then from partition
2, etc. Though you have configured it to read from just one partition
(topicCount has count = 1). So I am not sure what your intention is, read
all partitions serially, or in parallel.

If you want to start of Kafka + Spark Streaming, I strongly suggest reading
the Kafka integration guide -
https://spark.apache.org/docs/latest/streaming-kafka-integration.html
and run the examples for the two ways
-
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
-
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala

Since you understand the high level consumer idea, you may want to start
with the first receiver-based approach, which uses HL consumer as well, and
takes topicCounts.


On Tue, Jul 21, 2015 at 8:23 AM, Hafsa Asif <ha...@matchinguu.com>
wrote:

> Hi, I have  a simple High level Kafka Consumer like :
> package matchinguu.kafka.consumer;
>
>
> import kafka.consumer.Consumer;
> import kafka.consumer.ConsumerConfig;
> import kafka.consumer.ConsumerIterator;
> import kafka.consumer.KafkaStream;
> import kafka.javaapi.consumer.ConsumerConnector;
>
> import java.util.*;
>
> public class SimpleHLConsumer {
>
>     private final ConsumerConnector consumer;
>     private final String topic;
>
>     public SimpleHLConsumer(String zookeeper, String groupId, String topic)
> {
>         Properties props = new Properties();
>         props.put("zookeeper.connect", zookeeper);
>         props.put("group.id", groupId);
>         props.put("zookeeper.session.timeout.ms", "500");
>         props.put("zookeeper.sync.time.ms", "250");
>         props.put("auto.commit.interval.ms", "1000");
>
>
>         consumer = Consumer.createJavaConsumerConnector(new
> ConsumerConfig(props));
>         this.topic = topic;
>     }
>
>     public void testConsumer() {
>         Map<String, Integer> topicCount = new HashMap<String, Integer>();
>         topicCount.put(topic, 1);
>
>         Map<String, List&lt;KafkaStream&lt;byte[], byte[]>>>
> consumerStreams
> = consumer.createMessageStreams(topicCount);
>         List<KafkaStream&lt;byte[], byte[]>> streams =
> consumerStreams.get(topic);
>         for (final KafkaStream stream : streams) {
>
>             ConsumerIterator<byte[], byte[]> it = stream.iterator();
>             while (it.hasNext()) {
>                 System.out.println("------------");
>                 System.out.println("Message from Single Topic: " + new
> String(it.next().message()));
>             }
>         }
>         if (consumer != null) {
>             System.out.println("Shutdown Happens");
>             consumer.shutdown();
>         }
>
>     }
>
>     public static void main(String[] args) {
>         System.out.println("Consumer is now reading messages from
> producer");
>         //String topic = args[0];
>         String topic = "test";
>         SimpleHLConsumer simpleHLConsumer = new
> SimpleHLConsumer("localhost:2181", "testgroup", topic);
>         simpleHLConsumer.testConsumer();
>    }
>
> }
>
> I want to get my messages through Spark Java Streaming with Kafka
> integration. Can anyone help me to reform this code so that I can get same
> output with Spark Kafka integration.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Convert-Simple-Kafka-Consumer-to-standalone-Spark-JavaStream-Consumer-tp23930.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>