You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Michael Albert <m_...@yahoo.com.INVALID> on 2014/11/04 04:33:53 UTC
avro + parquet + vector + NullPointerException while
reading
Greetings!
I'm trying to use avro and parquet with the following schema:
{
"name": "TestStruct",
"namespace": "bughunt",
"type": "record",
"fields": [
{
"name": "string_array",
"type": { "type": "array", "items": "string" }
}
]
}
The writing process seems to be OK, but when I try to read it with Spark, I get:
com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
Serialization trace:
string_array (bughunt.TestStruct)
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
When I try to read it with Hive, I get this:
Failed with exception java.io.IOException:org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.ClassCastException: org.apache.hadoop.io.BytesWritable cannot be cast to org.apache.hadoop.io.ArrayWritable
Which would lead me to suspect that this might be related to this one: https://github.com/Parquet/parquet-mr/issues/281 , but that one seems to be Hive specific, and I am not seeing Spark read the data it claims to have written itself.
I'm running on an Amazon EMR cluster using the "version 2.4.0" hadoop code and spark 1.1.0.Has anyone else observed this sort of behavior?
For completeness, here is the code that writes the data:
package bughunt
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import parquet.avro.AvroWriteSupport
import parquet.avro.AvroParquetOutputFormat
import parquet.hadoop.ParquetOutputFormat
import java.util.ArrayList
object GenData {
val outputPath = "/user/xxxxx/testdata"
val words = List(
List("apple", "banana", "cherry"),
List("car", "boat", "plane"),
List("lion", "tiger", "bear"),
List("north", "south", "east", "west"),
List("up", "down", "left", "right"),
List("red", "green", "blue"))
def main(args: Array[String]) {
val conf = new SparkConf(true)
.setAppName("IngestLoanApplicattion")
//.set("spark.kryo.registrator",
// classOf[CommonRegistrator].getName)
.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
.set("spark.kryoserializer.buffer.mb", 4.toString)
.set("spark.kryo.referenceTracking", "false")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(words)
val job = new Job(sc.hadoopConfiguration)
ParquetOutputFormat.setWriteSupportClass(job, classOf[AvroWriteSupport])
AvroParquetOutputFormat.setSchema(job,
TestStruct.SCHEMA$)
rdd.map(p => {
val xs = new java.util.ArrayList[String]
for (z<-p) { xs.add(z) }
val bldr = TestStruct.newBuilder()
bldr.setStringArray(xs)
(null, bldr.build()) })
.saveAsNewAPIHadoopFile(outputPath,
classOf[Void],
classOf[TestStruct],
classOf[ParquetOutputFormat[TestStruct]],
job.getConfiguration)
}
}
To read the data, I use this sort of code from the spark-shell:
:paste
import bughunt.TestStruct
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.SparkContext
import parquet.hadoop.ParquetInputFormat
import parquet.avro.AvroReadSupport
def openRddSpecific(sc: SparkContext) = {
val job = new Job(sc.hadoopConfiguration)
ParquetInputFormat.setReadSupportClass(job,
classOf[AvroReadSupport[TestStruct]])
sc.newAPIHadoopFile("/user/malbert/testdata",
classOf[ParquetInputFormat[TestStruct]],
classOf[Void],
classOf[TestStruct],
job.getConfiguration)
}
I start the Spark shell as follows:
spark-shell \
--jars ../my-jar-containing-the-class-definitions.jar \
--conf mapreduce.user.classpath.first=true \
--conf spark.kryo.referenceTracking=false \
--conf spark.kryoserializer.buffer.mb=4 \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer
I'm stumped. I can read and write records and maps, but arrays/vectors elude me.Am I missing something obvious?
Thanks!
Sincerely, Mike Albert
Re: avro + parquet + vector + NullPointerException while
reading
Posted by Michael Albert <m_...@yahoo.com.INVALID>.
Thanks for the advice!
What seems to work for is is that I define the array type as: "type": { "type": "array", "items": "string", "java-class": "java.util.ArrayList" }It seems to be creating an avro.Generic.List, which spark doesn't know how to serialize, instead of a guava.util.List, which spark likes.
Hive at 0.13.1 still can't read it though...Thanks!-Mike
From: Michael Armbrust <mi...@databricks.com>
To: Michael Albert <m_...@yahoo.com>
Cc: "user@spark.apache.org" <us...@spark.apache.org>
Sent: Tuesday, November 4, 2014 2:37 PM
Subject: Re: avro + parquet + vector<string> + NullPointerException while reading
You might consider using the native parquet support built into Spark SQL instead of using the raw library:
http://spark.apache.org/docs/latest/sql-programming-guide.html#parquet-files
On Mon, Nov 3, 2014 at 7:33 PM, Michael Albert <m_...@yahoo.com.invalid> wrote:
Greetings!
I'm trying to use avro and parquet with the following schema:{ "name": "TestStruct", "namespace": "bughunt", "type": "record", "fields": [ { "name": "string_array", "type": { "type": "array", "items": "string" } } ]}The writing process seems to be OK, but when I try to read it with Spark, I get:com.esotericsoftware.kryo.KryoException: java.lang.NullPointerExceptionSerialization trace:string_array (bughunt.TestStruct) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)When I try to read it with Hive, I get this:Failed with exception java.io.IOException:org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.ClassCastException: org.apache.hadoop.io.BytesWritable cannot be cast to org.apache.hadoop.io.ArrayWritableWhich would lead me to suspect that this might be related to this one: https://github.com/Parquet/parquet-mr/issues/281 , but that one seems to be Hive specific, and I am not seeing Spark read the data it claims to have written itself.
I'm running on an Amazon EMR cluster using the "version 2.4.0" hadoop code and spark 1.1.0.Has anyone else observed this sort of behavior?
For completeness, here is the code that writes the data:package bughunt
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport org.apache.spark.SparkContext._
import parquet.avro.AvroWriteSupportimport parquet.avro.AvroParquetOutputFormatimport parquet.hadoop.ParquetOutputFormat
import java.util.ArrayList
object GenData { val outputPath = "/user/xxxxx/testdata" val words = List( List("apple", "banana", "cherry"), List("car", "boat", "plane"), List("lion", "tiger", "bear"), List("north", "south", "east", "west"), List("up", "down", "left", "right"), List("red", "green", "blue"))
def main(args: Array[String]) { val conf = new SparkConf(true) .setAppName("IngestLoanApplicattion") //.set("spark.kryo.registrator", // classOf[CommonRegistrator].getName) .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .set("spark.kryoserializer.buffer.mb", 4.toString) .set("spark.kryo.referenceTracking", "false")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(words)
val job = new Job(sc.hadoopConfiguration)
ParquetOutputFormat.setWriteSupportClass(job, classOf[AvroWriteSupport]) AvroParquetOutputFormat.setSchema(job, TestStruct.SCHEMA$)
rdd.map(p => { val xs = new java.util.ArrayList[String] for (z<-p) { xs.add(z) } val bldr = TestStruct.newBuilder() bldr.setStringArray(xs) (null, bldr.build()) }) .saveAsNewAPIHadoopFile(outputPath, classOf[Void], classOf[TestStruct], classOf[ParquetOutputFormat[TestStruct]], job.getConfiguration) }}
To read the data, I use this sort of code from the spark-shell::paste
import bughunt.TestStruct
import org.apache.hadoop.mapreduce.Jobimport org.apache.spark.SparkContext
import parquet.hadoop.ParquetInputFormatimport parquet.avro.AvroReadSupport
def openRddSpecific(sc: SparkContext) = { val job = new Job(sc.hadoopConfiguration)
ParquetInputFormat.setReadSupportClass(job, classOf[AvroReadSupport[TestStruct]])
sc.newAPIHadoopFile("/user/malbert/testdata", classOf[ParquetInputFormat[TestStruct]], classOf[Void], classOf[TestStruct], job.getConfiguration)}I start the Spark shell as follows:spark-shell \ --jars ../my-jar-containing-the-class-definitions.jar \ --conf mapreduce.user.classpath.first=true \ --conf spark.kryo.referenceTracking=false \ --conf spark.kryoserializer.buffer.mb=4 \ --conf spark.serializer=org.apache.spark.serializer.KryoSerializer
I'm stumped. I can read and write records and maps, but arrays/vectors elude me.Am I missing something obvious?
Thanks!
Sincerely, Mike Albert
Re: avro + parquet + vector + NullPointerException while reading
Posted by Michael Armbrust <mi...@databricks.com>.
You might consider using the native parquet support built into Spark SQL
instead of using the raw library:
http://spark.apache.org/docs/latest/sql-programming-guide.html#parquet-files
On Mon, Nov 3, 2014 at 7:33 PM, Michael Albert <
m_albert137@yahoo.com.invalid> wrote:
> Greetings!
>
>
> I'm trying to use avro and parquet with the following schema:
>
> {
>
> "name": "TestStruct",
>
> "namespace": "bughunt",
>
> "type": "record",
>
> "fields": [
>
> {
>
> "name": "string_array",
>
> "type": { "type": "array", "items": "string" }
>
> }
>
> ]
>
> }
> The writing process seems to be OK, but when I try to read it with Spark,
> I get:
>
> com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
>
> Serialization trace:
>
> string_array (bughunt.TestStruct)
>
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
>
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
> When I try to read it with Hive, I get this:
>
> Failed with exception
> java.io.IOException:org.apache.hadoop.hive.ql.metadata.HiveException:
> java.lang.ClassCastException: org.apache.hadoop.io.BytesWritable cannot be
> cast to org.apache.hadoop.io.ArrayWritable
> Which would lead me to suspect that this might be related to this one:
> https://github.com/Parquet/parquet-mr/issues/281 , but that one seems to
> be Hive specific, and I am not seeing Spark read the data it claims to have
> written itself.
>
> I'm running on an Amazon EMR cluster using the "version 2.4.0" hadoop code
> and spark 1.1.0.
> Has anyone else observed this sort of behavior?
>
> For completeness, here is the code that writes the data:
>
> package bughunt
>
>
> import org.apache.hadoop.mapreduce.Job
>
>
> import org.apache.spark.SparkConf
>
> import org.apache.spark.SparkContext
>
> import org.apache.spark.SparkContext._
>
>
>
> import parquet.avro.AvroWriteSupport
>
> import parquet.avro.AvroParquetOutputFormat
>
> import parquet.hadoop.ParquetOutputFormat
>
>
> import java.util.ArrayList
>
>
>
> object GenData {
>
> val outputPath = "/user/xxxxx/testdata"
>
> val words = List(
>
> List("apple", "banana", "cherry"),
>
> List("car", "boat", "plane"),
>
> List("lion", "tiger", "bear"),
>
> List("north", "south", "east", "west"),
>
> List("up", "down", "left", "right"),
>
> List("red", "green", "blue"))
>
>
> def main(args: Array[String]) {
>
> val conf = new SparkConf(true)
>
> .setAppName("IngestLoanApplicattion")
>
> //.set("spark.kryo.registrator",
>
> // classOf[CommonRegistrator].getName)
>
> .set("spark.serializer",
>
> "org.apache.spark.serializer.KryoSerializer")
>
> .set("spark.kryoserializer.buffer.mb", 4.toString)
>
> .set("spark.kryo.referenceTracking", "false")
>
>
> val sc = new SparkContext(conf)
>
>
> val rdd = sc.parallelize(words)
>
>
> val job = new Job(sc.hadoopConfiguration)
>
>
> ParquetOutputFormat.setWriteSupportClass(job,
> classOf[AvroWriteSupport])
>
> AvroParquetOutputFormat.setSchema(job,
>
> TestStruct.SCHEMA$)
>
>
> rdd.map(p => {
>
> val xs = new java.util.ArrayList[String]
>
> for (z<-p) { xs.add(z) }
>
> val bldr = TestStruct.newBuilder()
>
> bldr.setStringArray(xs)
>
> (null, bldr.build()) })
>
> .saveAsNewAPIHadoopFile(outputPath,
>
> classOf[Void],
>
> classOf[TestStruct],
>
> classOf[ParquetOutputFormat[TestStruct]],
>
> job.getConfiguration)
>
> }
>
> }
>
> To read the data, I use this sort of code from the spark-shell:
>
> :paste
>
>
> import bughunt.TestStruct
>
>
> import org.apache.hadoop.mapreduce.Job
>
> import org.apache.spark.SparkContext
>
>
> import parquet.hadoop.ParquetInputFormat
>
> import parquet.avro.AvroReadSupport
>
>
> def openRddSpecific(sc: SparkContext) = {
>
> val job = new Job(sc.hadoopConfiguration)
>
>
> ParquetInputFormat.setReadSupportClass(job,
>
> classOf[AvroReadSupport[TestStruct]])
>
>
> sc.newAPIHadoopFile("/user/malbert/testdata",
>
> classOf[ParquetInputFormat[TestStruct]],
>
> classOf[Void],
>
> classOf[TestStruct],
>
> job.getConfiguration)
>
> }
> I start the Spark shell as follows:
>
> spark-shell \
>
> --jars ../my-jar-containing-the-class-definitions.jar \
>
> --conf mapreduce.user.classpath.first=true \
>
> --conf spark.kryo.referenceTracking=false \
>
> --conf spark.kryoserializer.buffer.mb=4 \
>
> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer
>
> I'm stumped. I can read and write records and maps, but arrays/vectors
> elude me.
> Am I missing something obvious?
>
> Thanks!
>
> Sincerely,
> Mike Albert
>