You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Amit Saxena <am...@gmail.com> on 2016/02/11 10:09:44 UTC

Kangaroo projects with KafkaInputformat - Mapreduce.

Hi Team,

Does any one use Kangaroo Project which uses SimpleConsumer of Kafka API,
that provides support to integrate with MapReduce Jobs. Facing one issue
related to getting an offsets for specific consumer. Also after Re
launching,  the committed offsets for a consumer is to be read, and start
from that point to generate the next Input Split.

My environment is -
CDH - CDH-5.5
KAFKA - KAFKA-0.8.2

In my case , One broker created a new topic with one partition, and in that
we have 10 messages. Now , I launched my map reduce, with Kangaroo's
Kafkainputformat, but not able to get any Inputsplit, may be beacuse my
offsets are set or retrived correctly.

Any clue what could get wrong on this ?

below is the job details

KafkaJobBuilder kafkaJobBuilder = KafkaJobBuilder.newBuilder();
        kafkaJobBuilder.setJobName("amit-topic-kafka-hadoop");
        kafkaJobBuilder.setZkConnect(zkString);
        kafkaJobBuilder.addQueueInput(topicName, consumerGroup,
KafkaReaderMap.class);
        kafkaJobBuilder.setNumReduceTasks(0);
        kafkaJobBuilder.setKafkaFetchSizeBytes(1000000);
        kafkaJobBuilder.setMapOutputKeyClass(Text.class);
        kafkaJobBuilder.setMapOutputValueClass(Text.class);
        kafkaJobBuilder.setTextFileOutputFormat(outputPath);

//        Configuration configuration = new Configuration();
        getConf().setLong("kafka.timestamp.offset",
System.currentTimeMillis());
        getConf().setInt("kafka.max.splits.per.partition",
Integer.MAX_VALUE);



Thanks
Amit