You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by "Afshartous, Nick" <na...@wbgames.com> on 2017/03/22 17:18:50 UTC

[ Spark Streaming & Kafka 0.10 ] Possible bug

Hi,

I think I'm seeing a bug in the context of upgrading to using the Kafka 0.10 streaming API.  Code fragments follow.
--
   Nick

       JavaInputDStream<ConsumerRecord<String, byte[]>> rawStream = getDirectKafkaStream();

       JavaDStream<Tuple2<String, byte[]>> messagesTuple = rawStream.map(
                new Function<ConsumerRecord<String, byte[]>, Tuple2<String, byte[]>>() {
                          @Override
                          public Tuple2<String, byte[]> call(ConsumerRecord<String, byte[]> record) {
                              final String hyphen = "-";
                              final String topicPartition = record.partition() + hyphen + record.offset();

                              return new Tuple2<>(topicPartition, record.value());
                          }
                      }
        );

        messagesTuple.foreachRDD(new VoidFunction<JavaRDD<Tuple2<String, byte[]>>>() {
                                     @Override
                                     public void call(JavaRDD<Tuple2<String, byte[]>> rdd) throws Exception {
                                         List<Tuple2<String, byte[]>> list = rdd.take(10);

                                         for (Tuple2<String, byte[]> pair : list) {
                                             log.info("messages tuple key: " + pair._1() + " : " + pair._2());
                                         }
                                     }
                                 }
        );


The above foreachRDD logs output correctly.

17/03/22 15:57:01 INFO StreamingKafkaConsumerDriver: messages tuple key: -13-231599504 : �2017-03-22 15:54:05.568628����$�g� ClientDev_Perf0585965449a1d3524b9e68396X@6eda8a884567b3442be68282b35aeeafMaterialReviewSinglePlayer`?��@�����Vwin��@1.0.1703.0Unlabeled Stable�8���Not ApplicableNot ApplicableNot ApplicabledayMR_Day01Empty�<<<BBBBBB@@@


However, when invoking mapPartitionsToPair on messagesTuple a CastException results when accessing the 2nd element of the pair.

      messagesTuple.mapPartitionsToPair(new RecordFlatMapPairPartitionFunction2(
                    outputDirectory, schemaServiceUrl, product, env, batchId, typeMap, avroSchemaMap, avroSchemaAcc));

17/03/22 15:57:02 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, ip-10-247-0-141.ec2.internal, executor 1): java.lang.ClassCastException: java.lang.String cannot be cast to [B
    at com.wb.analytics.spark.services.functions.RecordFlatMapPairPartitionFunction2.call(RecordFlatMapPairPartitionFunction2.java:113)



public class RecordFlatMapPairPartitionFunction2 implements
        PairFlatMapFunction<Iterator<Tuple2<String, byte[]>>, String, String> {
...

    @Override
    public Iterator<Tuple2<String, String>> call(Iterator<Tuple2<String, byte[]>> messages)
            throws Exception {

        while (messages.hasNext()) {
            Tuple2<String, byte[]> record = messages.next();
            String topicPartitionOffset = record._1();
            byte[] val = record._2();  // Line 113 <<<<<<<<<<<<<<<<<<<<<<<<<<< ClassCastException

       ...