You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@parquet.apache.org by Brian Henriksen <Br...@humedica.com> on 2016/01/20 16:31:13 UTC

MR job fails to read Parquet files with AvroReadSupport

Hello,

I could not find a user mailing list, please direct me to that if that is more appropriate for this question.

I have a Map-Reduce action that is part of an Oozie workflow that reads Parquet files from HDFS.  The purpose of the action is to read each Parquet record and output them as ‘|’ delimitated text.  I am working with CDH 4, Parquet 1.6.0, and the old apache.mapred API.  Here is the error I am getting, it appears to be occurring as mappers attempt to read the Parquet records and convert them to Avro (as I am using AvroReadSupport):



2016-01-19 16:46:42,626 WARN org.apache.hadoop.mapred.Child: Error running child
parquet.io.ParquetDecodingException: Can not read value at 0 in block 0 in file hdfs://nameservice1/group/foam/prototypes/parquet/ingest/201511/H788798/LDR/part-00023-r-00023.snappy.parquet
at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:177)
at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:130)
at parquet.hadoop.mapred.DeprecatedParquetInputFormat$RecordReaderWrapper.<init>(DeprecatedParquetInputFormat.java:90)
at parquet.hadoop.mapred.DeprecatedParquetInputFormat.getRecordReader(DeprecatedParquetInputFormat.java:47)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:394)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:332)
at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1438)
at org.apache.hadoop.mapred.Child.main(Child.java:262)
Caused by: org.apache.avro.AvroRuntimeException: Bad index: 2
at org.apache.avro.mapred.Pair.put(Pair.java:155)
at parquet.avro.AvroIndexedRecordConverter.set(AvroIndexedRecordConverter.java:116)
at parquet.avro.AvroIndexedRecordConverter.access$000(AvroIndexedRecordConverter.java:38)
at parquet.avro.AvroIndexedRecordConverter$1.add(AvroIndexedRecordConverter.java:76)
at parquet.avro.AvroIndexedRecordConverter$FieldStringConverter.addBinary(AvroIndexedRecordConverter.java:252)
at parquet.column.impl.ColumnReaderImpl$2$7.writeValue(ColumnReaderImpl.java:322)
at parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:369)
at parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:400)
at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:173)


What is this “Pair” schema that the program is trying to put values into?  The schema of my data has 13 fields, how do I provide it to the InputFormat or ReadSupport class?
Also, should my mapper be written to work with Parquet Group values or Avro GenericRecord values?  Here is my Oozie config for the action:

    <action name="PrepareCdrExportMR">
        <map-reduce>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                <delete path="${nameNode}/group/foam/prototypes/parquet/ingest/${release}/${group}/CDR_EXPORT"/>
            </prepare>
            <configuration>
            <property>
                <name>mapred.input.dir</name>
                <value>/group/foam/prototypes/parquet/ingest/${release}/${group}/LDR</value>
            </property>

            <property>
                <name>mapred.output.dir</name>
                <value>${nameNode}/group/foam/prototypes/parquet/ingest/${release}/${group}/CDR_EXPORT</value>
            </property>


            <property>
                <name>mapred.mapper.class</name>
                <value>com.humedica.duplo.foam.CdrExportMapper</value>
            </property>

            <property>
                <name>mapred.reducer.class</name>
                <value>com.humedica.duplo.foam.CdrExportReducer</value>
            </property>

                <property>
                    <name>mapred.output.compress</name>
                    <value>false</value>
                </property>

                <property>
                    <name>mapred.output.compression.codec</name>
                    <value>org.apache.hadoop.io.compress.GzipCodec</value>
                </property>

                <property>
                    <name>mapred.input.format.class</name>
                    <value>parquet.hadoop.mapred.DeprecatedParquetInputFormat</value>
                </property>

                <property>
                    <name>mapred.output.format.class</name>
                    <value>org.apache.hadoop.mapred.TextOutputFormat</value>
                </property>

                <property>
                    <name>mapred.textoutputformat.separator</name>
                    <value>|</value>
                </property>

                <property>
                    <name>mapred.reduce.tasks</name>
                    <value>8</value>
                </property>

                <property>
                    <name>parquet.read.support.class</name>
                    <value>parquet.avro.AvroReadSupport</value>
                </property>

                <property>
                    <name>avro.schema</name>
                    <value>{
                        "namespace": "org.apache.avro.mapred",
                        "type": "record",
                        "name": "Foo",
                        "fields": [
                        {"name": "sourceid", "type": "string"},
                        {"name": "patientid", "type": "string"},
                        {"name": "encounterid", "type": "string"},
                        {"name": "encounterdate", "type": "string"},
                        {"name": "groupid", "type": "string"},
                        {"name": "cdsid", "type": "string"},
                        {"name": "seq", "type": "string"},
                        {"name": "section", "type": "string"},
                        {"name": "foam", "type": "string"},
                        {"name": "head", "type": "string"},
                        {"name": "body", "type": "string"},
                        {"name": "tail", "type": "string"},
                        {"name": "observationdate", "type": "string"},
                        {"name": "source", "type": "string"}
                        ]
                        }
                    </value>
                </property>

                <property>
                    <name>fs.hdfs.impl.disable.cache</name>
                    <value>true</value>
                </property>


                <property>
                    <name>mapred.output.key.class</name>
                    <value>org.apache.hadoop.io.NullWritable</value>
                </property>

                <property>
                    <name>mapred.output.value.class</name>
                    <value>org.apache.hadoop.io.Text</value>
                </property>

                <property>
                    <name>mapred.mapoutput.key.class</name>
                    <value>org.apache.hadoop.io.NullWritable</value>
                </property>

                <property>
                    <name>mapred.mapoutput.value.class</name>
                    <value>org.apache.hadoop.io.Text</value>
                </property>

                <property>
                    <name>io.serializations</name>
                    <value>org.apache.hadoop.io.serializer.WritableSerialization,org.apache.avro.mapred.AvroSerialization</value>
                </property>
            </configuration>



        </map-reduce>
        <ok to="HdfsToLocal"/> <!-- To HdfsToLocal-->
        <error to="PrepareCdrExportFailureDBUpdate"/>
    </action>