You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Adis Ababa <id...@gmail.com> on 2016/09/28 03:21:22 UTC

Spark per topic num of partitions doubt

Hello,
I have asked the question on stackoverflow as well here
http://stackoverflow.com/questions/39737201/spark-kafka-per-topic-number-of-partitions-map-not-honored

I am confused about the "per topic number of partitions" parameter when
creating a inputDstream using KafkaUtils.createStream(...) method.

I am pasting the question here, please help.

From [Spark Documentation][1]

> parameter topicMap of KafkaUtils.createStream(...) method determines
 "per-topic number of Kafka partitions to consume" [Javadoc here][2]

So, when I created a kafka topic with 3 partitions and started a spark
receiver as

    Map<String, Integer> topicMap = new HashMap<>();
        topicMap.put(topic, 1);
    JavaPairReceiverInputDStream<String,String> inputDStream =
          KafkaUtils.createStream(javaStreamingContext, zookeeperQuorum,
groupId, topicMap);

I expected this receiver to receive messages from ONLY one partition of the
3 partitions that I created. However, when I check the offset checker, I
see the following:

    Pid Offset          logSize         Lag             Owner
    0   9               9               0               none
    1   11              11              0               none
    2   7               7               0               none

I expected this code to receive messages from one partition and then I
thought I needed to start more receivers (one per partition) as given in
the [documentation here][3] to cover all Kafka topic partitions.

    int numStreams = 3;
    List<JavaPairDStream<String, String>> kafkaStreams = new
ArrayList<>(numStreams);
    for (int i = 0; i < numStreams; i++) {
      kafkaStreams.add(KafkaUtils.createStream(...));
    }
    JavaPairDStream<String, String> unifiedStream =
streamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1,
kafkaStreams.size()));

So, my question is can one receiver receive messages from all partitions?
If so, what in the world does the topicMap(topic -> numPartitions) mean?

  [1]: http://spark.apache.org/docs/latest/streaming-kafka-integration.html
  [2]:
http://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/streaming/kafka/KafkaUtils.html
  [3]:
http://spark.apache.org/docs/latest/streaming-programming-guide.html#input-dstreams-and-receivers