You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@crunch.apache.org by Marcin Michalski <mm...@ifwe.co> on 2016/02/24 09:17:58 UTC

Best way to pass GenericData.Record from one fn to the next one

Hi, is there an easy way to pass GenericData.Record between Fns in crunch
without specifically stating the schema? Since I want to pass multiple avro
files that have various schemas as input to a single DoFn which will
enhance the data into a Pair and later I want to do an aggregation
(deduping) Fn on that data but don't want to specify the Schema in between
(I just want to work with GenericData.Record instances. Here is an example

PCollection<Record> messages =
pipeline.read(From.avroFile("/events/*/20160223/"));

// I don't want pass the schema instance but rather just work with
GenericData.Record, is that possible? Or do I need to store use Avros.bytes
instead and then reconstruct the Record later in the next Fn?
messages.parallellDo(new EventEnhancerDoFn(),
Avros.generics(messageSchema)).groupByKey...


Thanks,
Marcin

Re: Best way to pass GenericData.Record from one fn to the next one

Posted by Marcin Michalski <mm...@ifwe.co>.
Actually, this doesn't work since there seems to be an issue with passing
multiple inputs that have different schema to: pipeline.read(From.
*avroFile(paths)*). This is due to From only grabbing the first schema from
the list of input paths. I think this is bit misleading. Can I extend this
code to return a List<Source<GenericData.Record>> for every schema that I
am working with?

/**
   * Creates a {@code Source<GenericData.Record>} by reading the schema of
the Avro file
   * at the given paths using the {@code FileSystem} information contained
in the given
   * {@code Configuration} instance. If the first path is a directory, the
schema of a file in
   * the directory will be used to determine the schema to use.
   *
   * @param paths The path to the data on the filesystem
   * @param conf The configuration information
   * @return A new {@code Source<GenericData.Record>} instance
   */
  public static Source<GenericData.Record> avroFile(List<Path> paths,
Configuration conf) {
    Preconditions.checkArgument(!paths.isEmpty(), "At least one path must
be supplied");
    return avroFile(paths, Avros.generics(getSchemaFromPath(*paths.get(0)*,
conf)));
  }

I see that there is another method that I could potentially use
avroTableFiles but I am not sure if this intended to be used as input?

/**
   * Creates a {@code TableSource<K,V>} for reading an Avro key/value file
at the given paths.
   *
   * @param paths list of paths to be read by the returned source
   * @param tableType Avro table type for deserializing the table data
   * @return a new {@code TableSource<K,V>} instance for reading Avro
key/value data
   */
  public static <K, V> TableSource<K, V> avroTableFile(List<Path> paths,
PTableType<K, V> tableType) {
    return new AvroTableFileSource<K, V>(paths,
(AvroType<Pair<K,V>>)tableType);
  }




On Wed, Feb 24, 2016 at 12:51 PM, Marcin Michalski <mm...@ifwe.co>
wrote:

> I am going to try to write the generic input Record into a new avro schema
> that has some new elements + the original input record as json string. Then
> I will dedup the records based on grouping key and then invoke another fn
> that will reconstruct the original Avro schema based on the Json value and
> finally write that using FSDataOutputStream. I think that should work but
> it is bit hacky
>
> On Wed, Feb 24, 2016 at 7:52 AM, Josh Wills <jo...@gmail.com> wrote:
>
>> In theory, any PType that supports GenericRecord will work- even a dummy
>> one that defines a schema that isn't the same as the one you're using.
>>
>> I don't recommend doing that, of course, but it will work.
>>
>> On Wed, Feb 24, 2016 at 12:18 AM Marcin Michalski <mm...@ifwe.co>
>> wrote:
>>
>>> Hi, is there an easy way to pass GenericData.Record between Fns in
>>> crunch without specifically stating the schema? Since I want to pass
>>> multiple avro files that have various schemas as input to a single DoFn
>>> which will enhance the data into a Pair and later I want to do an
>>> aggregation (deduping) Fn on that data but don't want to specify the Schema
>>> in between (I just want to work with GenericData.Record instances. Here is
>>> an example
>>>
>>> PCollection<Record> messages =
>>> pipeline.read(From.avroFile("/events/*/20160223/"));
>>>
>>> // I don't want pass the schema instance but rather just work with
>>> GenericData.Record, is that possible? Or do I need to store use Avros.bytes
>>> instead and then reconstruct the Record later in the next Fn?
>>> messages.parallellDo(new EventEnhancerDoFn(),
>>> Avros.generics(messageSchema)).groupByKey...
>>>
>>>
>>> Thanks,
>>> Marcin
>>>
>>>
>
>
> --
> Marcin Michalski | Big Data Engineer
> mmichalski@ifwe.co <am...@ifwe.co> | (917) 478-9422 (c)
> <http://www.ifwe.co/>
> Tagged, Inc. is now if(we). Learn more at ifwe.co
>



-- 
Marcin Michalski | Big Data Engineer
mmichalski@ifwe.co <am...@ifwe.co> | (917) 478-9422 (c)
<http://www.ifwe.co/>
Tagged, Inc. is now if(we). Learn more at ifwe.co

Re: Best way to pass GenericData.Record from one fn to the next one

Posted by Marcin Michalski <mm...@ifwe.co>.
I am going to try to write the generic input Record into a new avro schema
that has some new elements + the original input record as json string. Then
I will dedup the records based on grouping key and then invoke another fn
that will reconstruct the original Avro schema based on the Json value and
finally write that using FSDataOutputStream. I think that should work but
it is bit hacky

On Wed, Feb 24, 2016 at 7:52 AM, Josh Wills <jo...@gmail.com> wrote:

> In theory, any PType that supports GenericRecord will work- even a dummy
> one that defines a schema that isn't the same as the one you're using.
>
> I don't recommend doing that, of course, but it will work.
>
> On Wed, Feb 24, 2016 at 12:18 AM Marcin Michalski <mm...@ifwe.co>
> wrote:
>
>> Hi, is there an easy way to pass GenericData.Record between Fns in
>> crunch without specifically stating the schema? Since I want to pass
>> multiple avro files that have various schemas as input to a single DoFn
>> which will enhance the data into a Pair and later I want to do an
>> aggregation (deduping) Fn on that data but don't want to specify the Schema
>> in between (I just want to work with GenericData.Record instances. Here is
>> an example
>>
>> PCollection<Record> messages =
>> pipeline.read(From.avroFile("/events/*/20160223/"));
>>
>> // I don't want pass the schema instance but rather just work with
>> GenericData.Record, is that possible? Or do I need to store use Avros.bytes
>> instead and then reconstruct the Record later in the next Fn?
>> messages.parallellDo(new EventEnhancerDoFn(),
>> Avros.generics(messageSchema)).groupByKey...
>>
>>
>> Thanks,
>> Marcin
>>
>>


-- 
Marcin Michalski | Big Data Engineer
mmichalski@ifwe.co <am...@ifwe.co> | (917) 478-9422 (c)
<http://www.ifwe.co/>
Tagged, Inc. is now if(we). Learn more at ifwe.co

Re: Best way to pass GenericData.Record from one fn to the next one

Posted by Josh Wills <jo...@gmail.com>.
In theory, any PType that supports GenericRecord will work- even a dummy
one that defines a schema that isn't the same as the one you're using.

I don't recommend doing that, of course, but it will work.
On Wed, Feb 24, 2016 at 12:18 AM Marcin Michalski <mm...@ifwe.co>
wrote:

> Hi, is there an easy way to pass GenericData.Record between Fns in crunch
> without specifically stating the schema? Since I want to pass multiple avro
> files that have various schemas as input to a single DoFn which will
> enhance the data into a Pair and later I want to do an aggregation
> (deduping) Fn on that data but don't want to specify the Schema in between
> (I just want to work with GenericData.Record instances. Here is an example
>
> PCollection<Record> messages =
> pipeline.read(From.avroFile("/events/*/20160223/"));
>
> // I don't want pass the schema instance but rather just work with
> GenericData.Record, is that possible? Or do I need to store use Avros.bytes
> instead and then reconstruct the Record later in the next Fn?
> messages.parallellDo(new EventEnhancerDoFn(),
> Avros.generics(messageSchema)).groupByKey...
>
>
> Thanks,
> Marcin
>
>