You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@avro.apache.org by 幻 <yg...@gmail.com> on 2011/03/18 04:13:40 UTC

Hi,all. How can I involve two avro files with different schema into one M/R job?

Hi,all.
     Currently,I have two avro files with different schema. I found that I
have to set the schema before running a M/R job if the files are in avro
format.But the schema of the files are probably not the same.How can I do
that without setting the schema before running a job? Thanks.


Best Regards,
Xu Yingzhong

Re: Hi,all. How can I involve two avro files with different schema into one M/R job?

Posted by 幻 <yg...@gmail.com>.
Sorry,Some mistakes.This is the AvroInputFormat part:
protected FileStatus[] listStatus(JobConf job) throws IOException {
    List<FileStatus> result = new ArrayList<FileStatus>();
  //  job.set
    for (FileStatus file : super.listStatus(job))
      if (file.getPath().getName().endsWith(AvroOutputFormat.EXT)){

     // UtilHelper.localTest("Files:"+file.getPath().toUri()+" |
"+file.getPath().toString(), "/opt/hivelogs/root/mine.log");
      this.setJobSchemas(job, file);
      result.add(file);
      }

    return result.toArray(new FileStatus[0]);
  }
private void setJobSchemas(JobConf job,FileStatus file) throws IOException {
// TODO Auto-generated method stub
 // String dst = "hdfs://localhost:9000/user/hive/warehouse/test/test.avro";

  Configuration conf = new Configuration();
  FileSystem fs = FileSystem.get(file.getPath().toUri(), conf);
 // FileStatus
  FSDataInputStream hdfsInStream = fs.open(new
Path(file.getPath().toString()));

  DataFileStream stream = new DataFileStream(hdfsInStream,new
GenericDatumReader<Object>());
//  System.out.println(stream.getSchema());
 UtilHelper.localTest("--Schema:"+stream.getSchema().toString(),"/opt/hivelogs/root/mine.log");
  job.set(file.getPath()+"-schema", stream.getSchema().toString());
  stream.close();
  fs.close();
//  DataFileReader rr = new DataFileReader(hdfsInStream);
}

2011/3/21 幻 <yg...@gmail.com>

> Thanks.I meet this problem because I want to use avro as storage format for
> HIVE.Now I find a solution,but I'm not sure if it's good enough:
> I change the AvroInputFormat to:
> protected FileStatus[] listStatus(JobConf job) throws IOException {
>     List<FileStatus> result = new ArrayList<FileStatus>();
>   //  job.set
>     for (FileStatus file : super.listStatus(job))
>       if (file.getPath().getName().endsWith(AvroOutputFormat.EXT)){
>
>      // UtilHelper.localTest("Files:"+file.getPath().toUri()+" |
> "+file.getPath().toString(), "/opt/hivelogs/root/mine.log");
>       this.setJobSchemas(job, file);
>       result.add(file);
>       }
>
>     return result.toArray(new FileStatus[0]);
>   }
> And in AvroRecordReader:
>   public AvroRecordReader(JobConf job, FileSplit split)
>     throws IOException {
> //LOG.info("Here is the file:"+split.getPath().getName());
> // UtilHelper.localTest("Here is the file:"+split.getPath().toString(),
> "/opt/hivelogs/root/mine.log");
>  // UtilHelper.localTest("--Schema
> is:"+job.get(split.getPath().toString()+"-schema"),
> "/opt/hivelogs/root/mine.log");
>  /*
>     this(DataFileReader.openReader
>          (new FsInput(split.getPath(), job),
>           job.getBoolean(AvroJob.INPUT_IS_REFLECT, false)
>           ? new ReflectDatumReader<T>(AvroJob.getInputSchema(job))
>           : new SpecificDatumReader<T>(AvroJob.getInputSchema(job))),
>          split);
>      */
>     this(DataFileReader.openReader
>             (new FsInput(split.getPath(), job),
>              job.getBoolean(AvroJob.INPUT_IS_REFLECT, false)
>              ? new
> ReflectDatumReader<T>(Schema.parse(job.get(split.getPath().toString()+"-schema")))
>              : new
> SpecificDatumReader<T>(Schema.parse(job.get(split.getPath().toString()+"-schema")))),
>             split);
>
>   }
>
> 2011/3/19 Doug Cutting <cu...@apache.org>
>
>> On 03/18/2011 11:31 AM, Harsh J wrote:
>> > Probably a small case, in which I would require reading from multiple
>> > sources in my job (perhaps even process them differently until the Map
>> > phase), with special reader-schemas for each of my sources.
>>
>> How would your mapper detect which schema was in use?  Would it use
>> something like instanceof?  If that's the case, then you could simply
>> use a union as the job's schema.
>>
>> Or would you want a different mapper for each input type?  That seems
>> like a higher-level tool, like Hadoop's MultipleInputs, which shouldn't
>> be too hard to build, but I don't think should be built into the base
>> MapReduce API, but rather a layer above it, no?
>>
>> Doug
>>
>
>

Re: Hi,all. How can I involve two avro files with different schema into one M/R job?

Posted by 幻 <yg...@gmail.com>.
Thanks.I meet this problem because I want to use avro as storage format for
HIVE.Now I find a solution,but I'm not sure if it's good enough:
I change the AvroInputFormat to:
protected FileStatus[] listStatus(JobConf job) throws IOException {
    List<FileStatus> result = new ArrayList<FileStatus>();
  //  job.set
    for (FileStatus file : super.listStatus(job))
      if (file.getPath().getName().endsWith(AvroOutputFormat.EXT)){

     // UtilHelper.localTest("Files:"+file.getPath().toUri()+" |
"+file.getPath().toString(), "/opt/hivelogs/root/mine.log");
      this.setJobSchemas(job, file);
      result.add(file);
      }

    return result.toArray(new FileStatus[0]);
  }
And in AvroRecordReader:
  public AvroRecordReader(JobConf job, FileSplit split)
    throws IOException {
//LOG.info("Here is the file:"+split.getPath().getName());
// UtilHelper.localTest("Here is the file:"+split.getPath().toString(),
"/opt/hivelogs/root/mine.log");
// UtilHelper.localTest("--Schema
is:"+job.get(split.getPath().toString()+"-schema"),
"/opt/hivelogs/root/mine.log");
/*
    this(DataFileReader.openReader
         (new FsInput(split.getPath(), job),
          job.getBoolean(AvroJob.INPUT_IS_REFLECT, false)
          ? new ReflectDatumReader<T>(AvroJob.getInputSchema(job))
          : new SpecificDatumReader<T>(AvroJob.getInputSchema(job))),
         split);
     */
    this(DataFileReader.openReader
            (new FsInput(split.getPath(), job),
             job.getBoolean(AvroJob.INPUT_IS_REFLECT, false)
             ? new
ReflectDatumReader<T>(Schema.parse(job.get(split.getPath().toString()+"-schema")))
             : new
SpecificDatumReader<T>(Schema.parse(job.get(split.getPath().toString()+"-schema")))),
            split);

  }

2011/3/19 Doug Cutting <cu...@apache.org>

> On 03/18/2011 11:31 AM, Harsh J wrote:
> > Probably a small case, in which I would require reading from multiple
> > sources in my job (perhaps even process them differently until the Map
> > phase), with special reader-schemas for each of my sources.
>
> How would your mapper detect which schema was in use?  Would it use
> something like instanceof?  If that's the case, then you could simply
> use a union as the job's schema.
>
> Or would you want a different mapper for each input type?  That seems
> like a higher-level tool, like Hadoop's MultipleInputs, which shouldn't
> be too hard to build, but I don't think should be built into the base
> MapReduce API, but rather a layer above it, no?
>
> Doug
>

Re: Hi,all. How can I involve two avro files with different schema into one M/R job?

Posted by Doug Cutting <cu...@apache.org>.
On 03/18/2011 11:31 AM, Harsh J wrote:
> Probably a small case, in which I would require reading from multiple
> sources in my job (perhaps even process them differently until the Map
> phase), with special reader-schemas for each of my sources.

How would your mapper detect which schema was in use?  Would it use
something like instanceof?  If that's the case, then you could simply
use a union as the job's schema.

Or would you want a different mapper for each input type?  That seems
like a higher-level tool, like Hadoop's MultipleInputs, which shouldn't
be too hard to build, but I don't think should be built into the base
MapReduce API, but rather a layer above it, no?

Doug

Re: Hi,all. How can I involve two avro files with different schema into one M/R job?

Posted by Harsh J <qw...@gmail.com>.
On Fri, Mar 18, 2011 at 11:38 PM, Doug Cutting <cu...@apache.org> wrote:
> Is that what you're after?  Why would you need this?

Probably a small case, in which I would require reading from multiple
sources in my job (perhaps even process them differently until the Map
phase), with special reader-schemas for each of my sources.

This could be custom-built easily, but I just wondered if general
use-cases of avro datafiles could benefit from such a thing.

Right now AvroJob.setInputSchema(...) sets given schema as
"avro.input.schema" in the Job, and my suggestion was to make it
something like /path/1+avro.input.schema, /path/2+avro.input.schema so
that each instantiated record reader for mappers (via MultipleInputs)
can pick up its own special reader schema (since they get a /path/2
via FileSplit).

-- 
Harsh J
http://harshj.com

Re: Hi,all. How can I involve two avro files with different schema into one M/R job?

Posted by Doug Cutting <cu...@apache.org>.
On 03/18/2011 09:54 AM, Harsh J wrote:
> Would it help if the provided JSON schemae were added to the JobConf
> with the given path(s) as a prefix to the key used to retrieve them?
> This would help use with MultipleInputs and such (but it may get
> complicated to do if globs were involved?).

Not sure what you mean by "provided".  The user must specify a reader
schema for the job, and map tasks will be passed instances of this
schema.  That reader schema is available as AvroJob.getInputSchema(job).

If you wanted to know the writer's schema of each file read, then I
suppose we could have AvroRecordReader set that to a job property, or
simply expose this by adding a method like:

Schema AvroRecordReader.getSchema() { return reader.getSchema(); }

Is that what you're after?  Why would you need this?

Doug


Re: Hi,all. How can I involve two avro files with different schema into one M/R job?

Posted by Harsh J <qw...@gmail.com>.
Doug,

Would it help if the provided JSON schemae were added to the JobConf
with the given path(s) as a prefix to the key used to retrieve them?
This would help use with MultipleInputs and such (but it may get
complicated to do if globs were involved?).

On Fri, Mar 18, 2011 at 10:21 PM, Doug Cutting <cu...@apache.org> wrote:
> On 03/17/2011 08:13 PM, 幻 wrote:
>>      Currently,I have two avro files with different schema. I found that
>> I have to set the schema before running a M/R job if the files are in
>> avro format.But the schema of the files are probably not the same.How
>> can I do that without setting the schema before running a job? Thanks.
>
> The schema you set for the job is the reader's schema.  The schema in
> the input files is the writer's schema and not match this exactly.  It
> will be projected to the reader's schema, as described in the
> specification, particularly in the "Schema Resolution" section.
>
> http://avro.apache.org/docs/current/spec.html#Schema+Resolution
>
> The aliases section is also relevant:
>
> http://avro.apache.org/docs/current/spec.html#Aliases
>
> This can be used to extract fields from different schemas into a common
> data structure.  For example, if your input files use the following two
> schemas:
>
> {"type":"record", "name":"a.A", "fields":[{"name":"foo", "type":"int"}]}
> {"type":"record", "name":"b.B", "fields":[{"name":"bar", "type":"int"}]}
>
> then the following record can read both:
>
> {"type":"record", "name":"my.MapInput",
>  "aliases":["a.A","b.B"],
>  "fields":[{"name":"x", "type":"int", "aliases":["foo","bar"]}]
> }
>
> The reader's schema can thus include a common subset of fields in
> inputs.  It can map fields of compatible types that are named
> differently to a common field.  It can include fields that are not in
> all inputs, so long as they have a default value in the reader's schema.
>  It could include all data from all inputs, e.g., in the above case:
>
> {"type":"record", "name":"my.MapInput",
>  "aliases":["a.A","b.B"],
>  "fields":[
>   {"name":"foo", "type":"int", "default": -1},
>   {"name":"bar", "type":"int", "default": -1},
>  ]
> }
>
> So there's a fair amount of flexibility available.
>
> Doug
>



-- 
Harsh J
http://harshj.com

Re: Hi,all. How can I involve two avro files with different schema into one M/R job?

Posted by Doug Cutting <cu...@apache.org>.
On 03/17/2011 08:13 PM, 幻 wrote:
>      Currently,I have two avro files with different schema. I found that
> I have to set the schema before running a M/R job if the files are in
> avro format.But the schema of the files are probably not the same.How
> can I do that without setting the schema before running a job? Thanks.

The schema you set for the job is the reader's schema.  The schema in
the input files is the writer's schema and not match this exactly.  It
will be projected to the reader's schema, as described in the
specification, particularly in the "Schema Resolution" section.

http://avro.apache.org/docs/current/spec.html#Schema+Resolution

The aliases section is also relevant:

http://avro.apache.org/docs/current/spec.html#Aliases

This can be used to extract fields from different schemas into a common
data structure.  For example, if your input files use the following two
schemas:

{"type":"record", "name":"a.A", "fields":[{"name":"foo", "type":"int"}]}
{"type":"record", "name":"b.B", "fields":[{"name":"bar", "type":"int"}]}

then the following record can read both:

{"type":"record", "name":"my.MapInput",
 "aliases":["a.A","b.B"],
 "fields":[{"name":"x", "type":"int", "aliases":["foo","bar"]}]
}

The reader's schema can thus include a common subset of fields in
inputs.  It can map fields of compatible types that are named
differently to a common field.  It can include fields that are not in
all inputs, so long as they have a default value in the reader's schema.
 It could include all data from all inputs, e.g., in the above case:

{"type":"record", "name":"my.MapInput",
 "aliases":["a.A","b.B"],
 "fields":[
   {"name":"foo", "type":"int", "default": -1},
   {"name":"bar", "type":"int", "default": -1},
  ]
}

So there's a fair amount of flexibility available.

Doug