You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@avro.apache.org by Chris Miller <cm...@gmail.com> on 2016/02/29 09:41:37 UTC
Avro Hive SerDe Issue
Hi,
I have a small Hadoop cluster w/ Hive, SparkSQL, etc. setup on Amazon EMR.
I have avro files stored on S3 that I want to be able to access from
SparkSQL. I have confirmed the files are valid and I am able to decode them
using avro-tools.
Here's the full schema for reference:
https://gist.github.com/ee99ee/8692a538b1a51f2dca5e
In SparkSQL, I created a table for these files as follows:
CREATE EXTERNAL TABLE IF NOT EXISTS impressions
PARTITIONED BY (year STRING, month STRING, day STRING)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
LOCATION 's3://analytics/prod/backend/avro/'
TBLPROPERTIES
('avro.schema.url'='hdfs:///data/schemas/impressionSchema.avsc')
Then I add a partition:
ALTER TABLE impressions ADD PARTITION (year='2016', month='02', day='26')
LOCATION 's3://analytics/prod/backend/avro/2016/02/26'
Looking at the table structure, everything seems to match the schema
correctly. If I run any query, I get an error:
SELECT COUNT(*) FROM impressions
org.apache.avro.AvroTypeException: Found MetaSchema, expecting union
at
org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
at
org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)
at
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
at
org.apache.hadoop.hive.serde2.avro.AvroDeserializer$SchemaReEncoder.reencode(AvroDeserializer.java:111)
at
org.apache.hadoop.hive.serde2.avro.AvroDeserializer.deserialize(AvroDeserializer.java:175)
at
org.apache.hadoop.hive.serde2.avro.AvroSerDe.deserialize(AvroSerDe.java:201)
at
org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:409)
at
org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:408)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at
org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:505)
at
org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.<init>(TungstenAggregationIterator.scala:686)
at
org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:95)
at
org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
I know the Avro files are valid because I can pull them down off S3, decode
them, and see all the data I expect. I'm pretty sure Hive is reading my
files because "MetaSchema" is the first field in the schema.
Anyone have any idea what is going on here or how I can further debug?
--
Chris Miller
Re: Avro Hive SerDe Issue
Posted by Chris Miller <cm...@gmail.com>.
So I've narrowed the issue down to the enum fields in the schema:
{
"name": "action",
"type": {
"name": "ActionEnum",
"type": "enum",
"symbols": ["nextPage", "finish", "onethousandRegister", "userUpdate",
"register"]
}
},
If I create a test schema with just a few string and int fields and encode
some sample data, everything works fine -- I'm able to load the files from
S3 and run queries on them in SparkSQL. If I add the enum field above, for
example, I get the error I mentioned before, even though the Avro files are
valid and able to be encoded/decoded with avro-tools CLI utility without
issue.
I should also add I can create the table and run queries without issue
using Hive, but not using SparkSQL. I see the tables in SparkSQL, but I get
the aforementioned error running the same query. This issue is specific to
the Avro decoder and SparkSQL.
How do I get Avro enum fields to work with SparkSQL? Maybe this is a bug in
org.apache.avro.io.ResolvingDecoder.java?
--
Chris Miller
On Mon, Feb 29, 2016 at 4:41 PM, Chris Miller <cm...@gmail.com>
wrote:
> Hi,
>
> I have a small Hadoop cluster w/ Hive, SparkSQL, etc. setup on Amazon EMR.
> I have avro files stored on S3 that I want to be able to access from
> SparkSQL. I have confirmed the files are valid and I am able to decode them
> using avro-tools.
>
> Here's the full schema for reference:
> https://gist.github.com/ee99ee/8692a538b1a51f2dca5e
>
> In SparkSQL, I created a table for these files as follows:
>
> CREATE EXTERNAL TABLE IF NOT EXISTS impressions
> PARTITIONED BY (year STRING, month STRING, day STRING)
> ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
> STORED AS INPUTFORMAT
> 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
> OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
> LOCATION 's3://analytics/prod/backend/avro/'
> TBLPROPERTIES
> ('avro.schema.url'='hdfs:///data/schemas/impressionSchema.avsc')
>
> Then I add a partition:
>
> ALTER TABLE impressions ADD PARTITION (year='2016', month='02', day='26')
> LOCATION 's3://analytics/prod/backend/avro/2016/02/26'
>
> Looking at the table structure, everything seems to match the schema
> correctly. If I run any query, I get an error:
>
> SELECT COUNT(*) FROM impressions
>
> org.apache.avro.AvroTypeException: Found MetaSchema, expecting union
> at
> org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292)
> at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
> at
> org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)
> at
> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
> at
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
> at
> org.apache.hadoop.hive.serde2.avro.AvroDeserializer$SchemaReEncoder.reencode(AvroDeserializer.java:111)
> at
> org.apache.hadoop.hive.serde2.avro.AvroDeserializer.deserialize(AvroDeserializer.java:175)
> at
> org.apache.hadoop.hive.serde2.avro.AvroSerDe.deserialize(AvroSerDe.java:201)
> at
> org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:409)
> at
> org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:408)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:505)
> at
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.<init>(TungstenAggregationIterator.scala:686)
> at
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:95)
> at
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86)
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
> I know the Avro files are valid because I can pull them down off S3,
> decode them, and see all the data I expect. I'm pretty sure Hive is reading
> my files because "MetaSchema" is the first field in the schema.
>
> Anyone have any idea what is going on here or how I can further debug?
>
> --
> Chris Miller
>