You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@avro.apache.org by Ben Vogan <be...@shopkick.com> on 2017/01/19 16:25:30 UTC

NullPointerException trying to write a union of null & enum

Hi all,

I am trying to write a component in spark that will serialize a DataFrame
to Avro and load it into Kafka.  I have been using code from spark-avro (
https://github.com/databricks/spark-avro/blob/branch-3.1/src/main/scala/com/databricks/spark/avro/AvroOutputWriter.scala#L82)
to do the translation, but I have run into a problem with enums.
Spark-Avro translates avro enums into strings when reading Avro, but does
not handle translating strings to enums when writing Avro.

In my code, before calling into spark-avro I check for the StringType and
create a converter:

def getEnumType(avroSchema: Schema, fieldName: String) : Option[Schema] = {
  val field = avroSchema.getField(fieldName)
  val fieldSchema = field.schema()
  if(fieldSchema.getType == Schema.Type.UNION) {
    for (t: Schema <- fieldSchema.getTypes) {
      if (t.getType.equals(Schema.Type.ENUM)) {
        return Some(t)
      }
    }
  } else if(fieldSchema.getType == Schema.Type.ENUM) {
    return Some(fieldSchema)
  }
  return None
}

val fieldConverters = sparkSchema.fields.map(field => {
  if(field.dataType == StringType) {
    // Try to auto convert strings to enums
    val enumType = getEnumType(avroSchema, field.name)
    if(enumType.isDefined) {
      // Return a function that translates the string into the enum
      (item: Any) => {
        if(item == null) {
          null
        } else {
          new GenericData.EnumSymbol(enumType.get, item.toString)
        }
      }
    } else {
      // Not an enum so just use the default string converter
      AvroSchemaConverter.createConverterToAvro(field.dataType,
field.name, avroSchema.getNamespace)
    }
  } else {
    AvroSchemaConverter.createConverterToAvro(field.dataType,
field.name, avroSchema.getNamespace)
  }
})

This works for some cases, but not for others and I do not understand the
difference.  When it fails I get the following stack, which seems to
indicate that Avro failed to resolve the union of "null" and the enum to
the null type, and then tried to write out the null using the enum schema:

Caused by: java.lang.NullPointerException: null of
com.shopkick.data.user_registration_type_enum of union in field
user_registration_type of com.shopkick.data.EventLog
at org.apache.avro.generic.GenericDatumWriter.npe(GenericDatumWriter.java:132)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:126)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:73)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:60)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:87)
at io.confluent.kafka.serializers.KafkaAvroEncoder.toBytes(KafkaAvroEncoder.java:47)
at com.shopkick.data.pipeline.loader.KafkaAvroLoader$$anonfun$3$$anonfun$apply$1.apply(KafkaAvroLoader.scala:120)
at com.shopkick.data.pipeline.loader.KafkaAvroLoader$$anonfun$3$$anonfun$apply$1.apply(KafkaAvroLoader.scala:117)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at com.github.benfradet.spark.kafka010.writer.RDDKafkaWriter$$anonfun$writeToKafka$1.apply(RDDKafkaWriter.scala:50)
at com.github.benfradet.spark.kafka010.writer.RDDKafkaWriter$$anonfun$writeToKafka$1.apply(RDDKafkaWriter.scala:46)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1869)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1869)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at org.apache.avro.Schema$EnumSchema.getEnumOrdinal(Schema.java:749)
at org.apache.avro.generic.GenericDatumWriter.writeEnum(GenericDatumWriter.java:163)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:106)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:73)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:112)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:73)
at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:153)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:143)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:105)
... 22 more
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1433)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1421)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1420)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1420)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1642)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1601)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1590)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1843)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1856)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1869)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1940)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:920)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:918)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918)
at com.github.benfradet.spark.kafka010.writer.RDDKafkaWriter.writeToKafka(RDDKafkaWriter.scala:46)
at com.shopkick.data.pipeline.loader.KafkaAvroLoader.load(KafkaAvroLoader.scala:151)
at com.shopkick.data.pipeline.loader.TLoader.transform(TLoader.scala:22)
at com.shopkick.data.pipeline.transformer.CompositeTransformer$$anonfun$transform$1.apply(CompositeTransformer.scala:86)
at com.shopkick.data.pipeline.transformer.CompositeTransformer$$anonfun$transform$1.apply(CompositeTransformer.scala:85)
at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:60)
at scala.collection.mutable.MutableList.foreach(MutableList.scala:30)
at com.shopkick.data.pipeline.transformer.CompositeTransformer.transform(CompositeTransformer.scala:85)
at com.memsql.spark.interface.DefaultPipelineMonitor$$anonfun$stepPipeline$4.apply(PipelineMonitor.scala:338)
at com.memsql.spark.interface.DefaultPipelineMonitor$$anonfun$stepPipeline$4.apply(PipelineMonitor.scala:318)
at com.memsql.spark.interface.DefaultPipelineMonitor.runPhase(PipelineMonitor.scala:410)
at com.memsql.spark.interface.DefaultPipelineMonitor.stepPipeline(PipelineMonitor.scala:318)
at com.memsql.spark.interface.DefaultPipelineMonitor.runPipeline(PipelineMonitor.scala:175)
at com.memsql.spark.interface.DefaultPipelineMonitor$$anon$1.run(PipelineMonitor.scala:129)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.SerializationException:
Error serializing Avro message
Caused by: java.lang.NullPointerException: null of
com.shopkick.data.user_registration_type_enum of union in field
user_registration_type of com.shopkick.data.EventLog
at org.apache.avro.generic.GenericDatumWriter.npe(GenericDatumWriter.java:132)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:126)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:73)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:60)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:87)
at io.confluent.kafka.serializers.KafkaAvroEncoder.toBytes(KafkaAvroEncoder.java:47)
at com.shopkick.data.pipeline.loader.KafkaAvroLoader$$anonfun$3$$anonfun$apply$1.apply(KafkaAvroLoader.scala:120)
at com.shopkick.data.pipeline.loader.KafkaAvroLoader$$anonfun$3$$anonfun$apply$1.apply(KafkaAvroLoader.scala:117)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at com.github.benfradet.spark.kafka010.writer.RDDKafkaWriter$$anonfun$writeToKafka$1.apply(RDDKafkaWriter.scala:50)
at com.github.benfradet.spark.kafka010.writer.RDDKafkaWriter$$anonfun$writeToKafka$1.apply(RDDKafkaWriter.scala:46)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1869)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1869)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at org.apache.avro.Schema$EnumSchema.getEnumOrdinal(Schema.java:749)
at org.apache.avro.generic.GenericDatumWriter.writeEnum(GenericDatumWriter.java:163)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:106)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:73)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:112)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:73)
at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:153)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:143)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:105)
... 22 more


The enum is defined as:

{
   "name":"user_registration_type",
   "type":[
      "null",
      {
         "type":"enum",
         "name":"user_registration_type_enum",
         "symbols":[
            "cold",
            "warm",
            "paid"
         ]
      }
   ],
   "doc":"An enumeration for segmenting users by their registration type.
E.g. hot, warm, cold.",
   "default":null
}

Any insight would be greatly appreciated.  Thanks,

-- 
<http://shopkick.com/>
*BENJAMIN VOGAN* | Data Platform Team Lead
shopkick <http://www.shopkick.com/>
<http://facebook.com/shopkick> <http://instagram.com/shopkick>
<http://pinterest.com/shopkick> <http://twitter.com/shopkick>
<https://www.linkedin.com/company/831240?trk=tyah&trkInfo=clickedVertical%3Acompany%2CentityType%3AentityHistoryName%2CclickedEntityId%3Acompany_831240%2Cidx%3A0>

The indispensable app that rewards you for shopping.