You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by "Afshartous, Nick" <na...@wbgames.com> on 2017/03/22 17:18:50 UTC
[ Spark Streaming & Kafka 0.10 ] Possible bug
Hi,
I think I'm seeing a bug in the context of upgrading to using the Kafka 0.10 streaming API. Code fragments follow.
--
Nick
JavaInputDStream<ConsumerRecord<String, byte[]>> rawStream = getDirectKafkaStream();
JavaDStream<Tuple2<String, byte[]>> messagesTuple = rawStream.map(
new Function<ConsumerRecord<String, byte[]>, Tuple2<String, byte[]>>() {
@Override
public Tuple2<String, byte[]> call(ConsumerRecord<String, byte[]> record) {
final String hyphen = "-";
final String topicPartition = record.partition() + hyphen + record.offset();
return new Tuple2<>(topicPartition, record.value());
}
}
);
messagesTuple.foreachRDD(new VoidFunction<JavaRDD<Tuple2<String, byte[]>>>() {
@Override
public void call(JavaRDD<Tuple2<String, byte[]>> rdd) throws Exception {
List<Tuple2<String, byte[]>> list = rdd.take(10);
for (Tuple2<String, byte[]> pair : list) {
log.info("messages tuple key: " + pair._1() + " : " + pair._2());
}
}
}
);
The above foreachRDD logs output correctly.
17/03/22 15:57:01 INFO StreamingKafkaConsumerDriver: messages tuple key: -13-231599504 : �2017-03-22 15:54:05.568628����$�g� ClientDev_Perf0585965449a1d3524b9e68396X@6eda8a884567b3442be68282b35aeeafMaterialReviewSinglePlayer`?��@�����Vwin��@1.0.1703.0Unlabeled Stable�8���Not ApplicableNot ApplicableNot ApplicabledayMR_Day01Empty�<<<BBBBBB@@@
However, when invoking mapPartitionsToPair on messagesTuple a CastException results when accessing the 2nd element of the pair.
messagesTuple.mapPartitionsToPair(new RecordFlatMapPairPartitionFunction2(
outputDirectory, schemaServiceUrl, product, env, batchId, typeMap, avroSchemaMap, avroSchemaAcc));
17/03/22 15:57:02 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, ip-10-247-0-141.ec2.internal, executor 1): java.lang.ClassCastException: java.lang.String cannot be cast to [B
at com.wb.analytics.spark.services.functions.RecordFlatMapPairPartitionFunction2.call(RecordFlatMapPairPartitionFunction2.java:113)
public class RecordFlatMapPairPartitionFunction2 implements
PairFlatMapFunction<Iterator<Tuple2<String, byte[]>>, String, String> {
...
@Override
public Iterator<Tuple2<String, String>> call(Iterator<Tuple2<String, byte[]>> messages)
throws Exception {
while (messages.hasNext()) {
Tuple2<String, byte[]> record = messages.next();
String topicPartitionOffset = record._1();
byte[] val = record._2(); // Line 113 <<<<<<<<<<<<<<<<<<<<<<<<<<< ClassCastException
...