You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@avro.apache.org by Chris Miller <cm...@gmail.com> on 2016/02/29 09:41:37 UTC

Avro Hive SerDe Issue

Hi,

I have a small Hadoop cluster w/ Hive, SparkSQL, etc. setup on Amazon EMR.
I have avro files stored on S3 that I want to be able to access from
SparkSQL. I have confirmed the files are valid and I am able to decode them
using avro-tools.

Here's the full schema for reference:
https://gist.github.com/ee99ee/8692a538b1a51f2dca5e

In SparkSQL, I created a table for these files as follows:

CREATE EXTERNAL TABLE IF NOT EXISTS impressions
PARTITIONED BY (year STRING, month STRING, day STRING)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
LOCATION 's3://analytics/prod/backend/avro/'
TBLPROPERTIES
('avro.schema.url'='hdfs:///data/schemas/impressionSchema.avsc')

Then I add a partition:

ALTER TABLE impressions ADD PARTITION (year='2016', month='02', day='26')
LOCATION 's3://analytics/prod/backend/avro/2016/02/26'

Looking at the table structure, everything seems to match the schema
correctly. If I run any query, I get an error:

SELECT COUNT(*) FROM impressions

org.apache.avro.AvroTypeException: Found MetaSchema, expecting union
    at
org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292)
    at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
    at
org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
    at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)
    at
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
    at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
    at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
    at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
    at
org.apache.hadoop.hive.serde2.avro.AvroDeserializer$SchemaReEncoder.reencode(AvroDeserializer.java:111)
    at
org.apache.hadoop.hive.serde2.avro.AvroDeserializer.deserialize(AvroDeserializer.java:175)
    at
org.apache.hadoop.hive.serde2.avro.AvroSerDe.deserialize(AvroSerDe.java:201)
    at
org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:409)
    at
org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:408)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at
org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:505)
    at
org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.<init>(TungstenAggregationIterator.scala:686)
    at
org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:95)
    at
org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86)
    at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
    at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
    at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
    at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

I know the Avro files are valid because I can pull them down off S3, decode
them, and see all the data I expect. I'm pretty sure Hive is reading my
files because "MetaSchema" is the first field in the schema.

Anyone have any idea what is going on here or how I can further debug?

--
Chris Miller

Re: Avro Hive SerDe Issue

Posted by Chris Miller <cm...@gmail.com>.
So I've narrowed the issue down to the enum fields in the schema:

{
  "name": "action",
  "type": {
    "name": "ActionEnum",
    "type": "enum",
    "symbols": ["nextPage", "finish", "onethousandRegister", "userUpdate",
"register"]
  }
},

If I create a test schema with just a few string and int fields and encode
some sample data, everything works fine -- I'm able to load the files from
S3 and run queries on them in SparkSQL. If I add the enum field above, for
example, I get the error I mentioned before, even though the Avro files are
valid and able to be encoded/decoded with avro-tools CLI utility without
issue.

I should also add I can create the table and run queries without issue
using Hive, but not using SparkSQL. I see the tables in SparkSQL, but I get
the aforementioned error running the same query. This issue is specific to
the Avro decoder and SparkSQL.

How do I get Avro enum fields to work with SparkSQL? Maybe this is a bug in
org.apache.avro.io.ResolvingDecoder.java?

--
Chris Miller

On Mon, Feb 29, 2016 at 4:41 PM, Chris Miller <cm...@gmail.com>
wrote:

> Hi,
>
> I have a small Hadoop cluster w/ Hive, SparkSQL, etc. setup on Amazon EMR.
> I have avro files stored on S3 that I want to be able to access from
> SparkSQL. I have confirmed the files are valid and I am able to decode them
> using avro-tools.
>
> Here's the full schema for reference:
> https://gist.github.com/ee99ee/8692a538b1a51f2dca5e
>
> In SparkSQL, I created a table for these files as follows:
>
> CREATE EXTERNAL TABLE IF NOT EXISTS impressions
> PARTITIONED BY (year STRING, month STRING, day STRING)
> ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
> STORED AS INPUTFORMAT
> 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
> OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
> LOCATION 's3://analytics/prod/backend/avro/'
> TBLPROPERTIES
> ('avro.schema.url'='hdfs:///data/schemas/impressionSchema.avsc')
>
> Then I add a partition:
>
> ALTER TABLE impressions ADD PARTITION (year='2016', month='02', day='26')
> LOCATION 's3://analytics/prod/backend/avro/2016/02/26'
>
> Looking at the table structure, everything seems to match the schema
> correctly. If I run any query, I get an error:
>
> SELECT COUNT(*) FROM impressions
>
> org.apache.avro.AvroTypeException: Found MetaSchema, expecting union
>     at
> org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292)
>     at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
>     at
> org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
>     at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)
>     at
> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
>     at
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
>     at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
>     at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
>     at
> org.apache.hadoop.hive.serde2.avro.AvroDeserializer$SchemaReEncoder.reencode(AvroDeserializer.java:111)
>     at
> org.apache.hadoop.hive.serde2.avro.AvroDeserializer.deserialize(AvroDeserializer.java:175)
>     at
> org.apache.hadoop.hive.serde2.avro.AvroSerDe.deserialize(AvroSerDe.java:201)
>     at
> org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:409)
>     at
> org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:408)
>     at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>     at
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:505)
>     at
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.<init>(TungstenAggregationIterator.scala:686)
>     at
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:95)
>     at
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86)
>     at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
>     at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
>     at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>     at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>     at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>     at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>     at org.apache.spark.scheduler.Task.run(Task.scala:89)
>     at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>     at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>     at java.lang.Thread.run(Thread.java:745)
>
> I know the Avro files are valid because I can pull them down off S3,
> decode them, and see all the data I expect. I'm pretty sure Hive is reading
> my files because "MetaSchema" is the first field in the schema.
>
> Anyone have any idea what is going on here or how I can further debug?
>
> --
> Chris Miller
>