You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@crunch.apache.org by Christian Tzolov <ch...@gmail.com> on 2014/06/10 11:42:51 UTC

Create custom Crunch source (CustomDataSource) for CustomInputFormat

Hi all,

I am trying to create a Crunch source for a custom InputFormat that has
structure like this: CustomInputFormat<Void, CustomNonWritableClass>

I've tried two implementations with no success. I must be missing something
but not sure what?

Implementation 1: Derive PType<Pair<Void, CustomNonWritableClass>> using
MapWritable as base type
------------------------------------------------------------
--------------------------------------------------------

PType<Pair<Void, CustomNonWritableClass>> derivedType = typeFamily.derived(
   (Class<Pair<Void, CustomNonWritableClass>>) Pair.of(null,
null).getClass(),
   new MapFn<MapWritable, Pair<Void, CustomNonWritableClass>>() {public
Pair<Void, CustomNonWritableClass> map(MapWritable input) {...}},
   new MapFn<Pair<Void, CustomNonWritableClass>, MapWritable>() {public
MapWritable map(Pair<Void, CustomNonWritableClass> input) {...}},
   typeFamily.records(MapWritable.class)
);

public class CustomDataSource extends FileTableSourceImpl<Void,
CustomNonWritableClass > {

   public CustomDataSource() {
          super(new Path("xsource"),
          (PTableType<Void, CustomNonWritableClass >) derivedType),
          FormatBundle.forInput(CustomInputFormat.class));
   }
   ...
}

This implementation fails before submitting the job with the following
error:

Exception in thread "main" java.lang.ClassCastException:
org.apache.crunch.types.writable.WritableType cannot be cast to
org.apache.crunch.types.PTableType
      at com.xxx.xxx.CustomDataSource.<init>(CustomDataSource.java:...)



Implementation 2: Derive PType<CustomNonWritableClass> using MapWritable as
base type
--------------------------------------------------------------------------------------------------------------------

public static MapWritableToCustomNonWritableClass extends
MapFn<MapWritable, CustomNonWritableClass> {
      public CustomNonWritableClass map(MapWritable input) {...}
}
public static CustomNonWritableClassToMapWritable
extends MapFn<CustomNonWritableClass, MapWritable>() {
     public MapWritable map(CustomNonWritableClass input) {...}
}

PType<CustomNonWritableClass> derivedType = typeFamily.derived(
   CustomNonWritableClass.class,
   new MapWritableToCustomNonWritableClass(),
   new CustomNonWritableClassToMapWritable(),
   typeFamily.records(MapWritable.class)
);

public class CustomDataSource extends
 FileSourceImpl<CustomNonWritableClass> {

   public CustomDataSource() {
       super(new Path("xsource"),
       (PTableType<Void, CustomNonWritableClass >) derivedType),
       FormatBundle.forInput(CustomInputFormat.class));
   }
   ...
}

When run this gets submitted to the cluster but the MR job fails with:

2014-06-10 10:31:23,653 FATAL [IPC Server handler 2 on 9290]
org.apache.hadoop.mapred.TaskAttemptListenerImpl: Task:
attempt_1401786307497_0078_m_000000_0 - exited :
java.lang.ClassCastException: com.xxx.xxx..CustomNonWritableClass cannot be
cast to org.apache.hadoop.io.MapWritable
     at
com.xxx.xxx.MapWritabToCustomNonWritableClass.map(MapWritabToCustomNonWritableClass.java:1)
     at org.apache.crunch.fn.CompositeMapFn.map(CompositeMapFn.java:63)
     at org.apache.crunch.MapFn.process(MapFn.java:34)
     at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:99)
     at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:110)
     at org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60)
     at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)

Thanks,
Christian

Re: Create custom Crunch source (CustomDataSource) for CustomInputFormat

Posted by Josh Wills <jw...@cloudera.com>.
On Thu, Jun 12, 2014 at 10:58 AM, Christian Tzolov <
christian.tzolov@gmail.com> wrote:

> Hi Josh,
>
> Good to see you too. Thanks for the HFileSource reference. The converter
> did the trick and i can read the data using the custom inputformat.
> Now I am struggling with the NonWritableType as i have no control over the
> implementation and the later provides no means for serialization. There are
> couple of ideas to explore but those are not crunch related.
>
> Thanks again for the helpful information.
>
> Cheers, Chris
>
> P.S. Between your post and today my son was born! Have a drink or two on
> our behalf ;)
>

Hey, that's awesome! Congratulations!!


>
>
> On Tue, Jun 10, 2014 at 5:26 PM, Josh Wills <jw...@cloudera.com> wrote:
>
>> Hey Christian,
>>
>> Good to see you again, I hope all is well. This is a complex setup, but
>> the good news is that we had to do it before for HBase 0.96, which also
>> returns non-Writable values in an InputFormat. The code you're going to
>> want to use as your reference is the HFileSource in crunch-hbase:
>>
>>
>> https://github.com/apache/crunch/blob/master/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileSource.java
>>
>> A few comments:
>> 1) First, ignore the Void key; instead of trying to return a PTable,
>> you're going to return a PCollection<NonWritableType> as the result of this
>> source. The key to doing that is in the Converter implementation that is
>> associated with the Source; look at the HBaseValueConverter (which is just
>> a simple pass-through value converter) to see how we do it for HFileSource.
>> The key thing to note in that converter is that the function
>> applyPTypeTransforms() returns false; this means that when reading/writing
>> data using that Converter, we don't apply the map functions from the PType
>> associated with the source (which is the right thing to do here as well.)
>> 2) I'm assuming that there is some hadoop Serialization that supports the
>> non-Writable value type you're supporting that Hadoop has to be configured
>> to read; be sure to override the configureSource() method in your Source to
>> add those serializations to the Job configuration (again, see how it's done
>> in HFileSource.configureSource)
>> 3) Given all that, the PType for your non-writable class that is
>> associated with the source should primarily be concerned with how to
>> serialize it during a shuffle or a read/write from another input format
>> (like Avro, or SequenceFile, or whatever), as we do in HBaseTypes. It won't
>> actually be used for reading/writing from the custom input format.
>>
>> Hope that helps.
>>
>> J
>>
>>
>> On Tue, Jun 10, 2014 at 2:42 AM, Christian Tzolov <
>> christian.tzolov@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>> I am trying to create a Crunch source for a custom InputFormat that has
>>> structure like this: CustomInputFormat<Void, CustomNonWritableClass>
>>>
>>> I've tried two implementations with no success. I must be missing
>>> something but not sure what?
>>>
>>> Implementation 1: Derive PType<Pair<Void, CustomNonWritableClass>> using
>>> MapWritable as base type
>>> ------------------------------------------------------------
>>> --------------------------------------------------------
>>>
>>> PType<Pair<Void, CustomNonWritableClass>> derivedType =
>>> typeFamily.derived(
>>>    (Class<Pair<Void, CustomNonWritableClass>>) Pair.of(null,
>>> null).getClass(),
>>>    new MapFn<MapWritable, Pair<Void, CustomNonWritableClass>>() {public
>>> Pair<Void, CustomNonWritableClass> map(MapWritable input) {...}},
>>>    new MapFn<Pair<Void, CustomNonWritableClass>, MapWritable>() {public
>>> MapWritable map(Pair<Void, CustomNonWritableClass> input) {...}},
>>>    typeFamily.records(MapWritable.class)
>>> );
>>>
>>> public class CustomDataSource extends FileTableSourceImpl<Void,
>>> CustomNonWritableClass > {
>>>
>>>    public CustomDataSource() {
>>>           super(new Path("xsource"),
>>>           (PTableType<Void, CustomNonWritableClass >) derivedType),
>>>           FormatBundle.forInput(CustomInputFormat.class));
>>>    }
>>>    ...
>>> }
>>>
>>> This implementation fails before submitting the job with the following
>>> error:
>>>
>>> Exception in thread "main" java.lang.ClassCastException:
>>> org.apache.crunch.types.writable.WritableType cannot be cast to
>>> org.apache.crunch.types.PTableType
>>>       at com.xxx.xxx.CustomDataSource.<init>(CustomDataSource.java:...)
>>>
>>>
>>>
>>> Implementation 2: Derive PType<CustomNonWritableClass> using MapWritable
>>> as base type
>>>
>>> --------------------------------------------------------------------------------------------------------------------
>>>
>>> public static MapWritableToCustomNonWritableClass extends
>>> MapFn<MapWritable, CustomNonWritableClass> {
>>>       public CustomNonWritableClass map(MapWritable input) {...}
>>> }
>>> public static CustomNonWritableClassToMapWritable
>>> extends MapFn<CustomNonWritableClass, MapWritable>() {
>>>      public MapWritable map(CustomNonWritableClass input) {...}
>>> }
>>>
>>> PType<CustomNonWritableClass> derivedType = typeFamily.derived(
>>>    CustomNonWritableClass.class,
>>>    new MapWritableToCustomNonWritableClass(),
>>>    new CustomNonWritableClassToMapWritable(),
>>>    typeFamily.records(MapWritable.class)
>>> );
>>>
>>> public class CustomDataSource extends
>>>  FileSourceImpl<CustomNonWritableClass> {
>>>
>>>    public CustomDataSource() {
>>>        super(new Path("xsource"),
>>>        (PTableType<Void, CustomNonWritableClass >) derivedType),
>>>        FormatBundle.forInput(CustomInputFormat.class));
>>>    }
>>>    ...
>>> }
>>>
>>> When run this gets submitted to the cluster but the MR job fails with:
>>>
>>> 2014-06-10 10:31:23,653 FATAL [IPC Server handler 2 on 9290]
>>> org.apache.hadoop.mapred.TaskAttemptListenerImpl: Task:
>>> attempt_1401786307497_0078_m_000000_0 - exited :
>>> java.lang.ClassCastException: com.xxx.xxx..CustomNonWritableClass cannot be
>>> cast to org.apache.hadoop.io.MapWritable
>>>      at
>>> com.xxx.xxx.MapWritabToCustomNonWritableClass.map(MapWritabToCustomNonWritableClass.java:1)
>>>      at org.apache.crunch.fn.CompositeMapFn.map(CompositeMapFn.java:63)
>>>      at org.apache.crunch.MapFn.process(MapFn.java:34)
>>>      at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:99)
>>>      at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:110)
>>>      at
>>> org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60)
>>>      at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
>>>
>>> Thanks,
>>> Christian
>>>
>>
>>
>>
>> --
>> Director of Data Science
>> Cloudera <http://www.cloudera.com>
>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>
>
>


-- 
Director of Data Science
Cloudera <http://www.cloudera.com>
Twitter: @josh_wills <http://twitter.com/josh_wills>

Re: Create custom Crunch source (CustomDataSource) for CustomInputFormat

Posted by Christian Tzolov <ch...@gmail.com>.
Hi Josh,

Good to see you too. Thanks for the HFileSource reference. The converter
did the trick and i can read the data using the custom inputformat.
Now I am struggling with the NonWritableType as i have no control over the
implementation and the later provides no means for serialization. There are
couple of ideas to explore but those are not crunch related.

Thanks again for the helpful information.

Cheers, Chris

P.S. Between your post and today my son was born! Have a drink or two on
our behalf ;)


On Tue, Jun 10, 2014 at 5:26 PM, Josh Wills <jw...@cloudera.com> wrote:

> Hey Christian,
>
> Good to see you again, I hope all is well. This is a complex setup, but
> the good news is that we had to do it before for HBase 0.96, which also
> returns non-Writable values in an InputFormat. The code you're going to
> want to use as your reference is the HFileSource in crunch-hbase:
>
>
> https://github.com/apache/crunch/blob/master/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileSource.java
>
> A few comments:
> 1) First, ignore the Void key; instead of trying to return a PTable,
> you're going to return a PCollection<NonWritableType> as the result of this
> source. The key to doing that is in the Converter implementation that is
> associated with the Source; look at the HBaseValueConverter (which is just
> a simple pass-through value converter) to see how we do it for HFileSource.
> The key thing to note in that converter is that the function
> applyPTypeTransforms() returns false; this means that when reading/writing
> data using that Converter, we don't apply the map functions from the PType
> associated with the source (which is the right thing to do here as well.)
> 2) I'm assuming that there is some hadoop Serialization that supports the
> non-Writable value type you're supporting that Hadoop has to be configured
> to read; be sure to override the configureSource() method in your Source to
> add those serializations to the Job configuration (again, see how it's done
> in HFileSource.configureSource)
> 3) Given all that, the PType for your non-writable class that is
> associated with the source should primarily be concerned with how to
> serialize it during a shuffle or a read/write from another input format
> (like Avro, or SequenceFile, or whatever), as we do in HBaseTypes. It won't
> actually be used for reading/writing from the custom input format.
>
> Hope that helps.
>
> J
>
>
> On Tue, Jun 10, 2014 at 2:42 AM, Christian Tzolov <
> christian.tzolov@gmail.com> wrote:
>
>> Hi all,
>>
>> I am trying to create a Crunch source for a custom InputFormat that has
>> structure like this: CustomInputFormat<Void, CustomNonWritableClass>
>>
>> I've tried two implementations with no success. I must be missing
>> something but not sure what?
>>
>> Implementation 1: Derive PType<Pair<Void, CustomNonWritableClass>> using
>> MapWritable as base type
>> ------------------------------------------------------------
>> --------------------------------------------------------
>>
>> PType<Pair<Void, CustomNonWritableClass>> derivedType =
>> typeFamily.derived(
>>    (Class<Pair<Void, CustomNonWritableClass>>) Pair.of(null,
>> null).getClass(),
>>    new MapFn<MapWritable, Pair<Void, CustomNonWritableClass>>() {public
>> Pair<Void, CustomNonWritableClass> map(MapWritable input) {...}},
>>    new MapFn<Pair<Void, CustomNonWritableClass>, MapWritable>() {public
>> MapWritable map(Pair<Void, CustomNonWritableClass> input) {...}},
>>    typeFamily.records(MapWritable.class)
>> );
>>
>> public class CustomDataSource extends FileTableSourceImpl<Void,
>> CustomNonWritableClass > {
>>
>>    public CustomDataSource() {
>>           super(new Path("xsource"),
>>           (PTableType<Void, CustomNonWritableClass >) derivedType),
>>           FormatBundle.forInput(CustomInputFormat.class));
>>    }
>>    ...
>> }
>>
>> This implementation fails before submitting the job with the following
>> error:
>>
>> Exception in thread "main" java.lang.ClassCastException:
>> org.apache.crunch.types.writable.WritableType cannot be cast to
>> org.apache.crunch.types.PTableType
>>       at com.xxx.xxx.CustomDataSource.<init>(CustomDataSource.java:...)
>>
>>
>>
>> Implementation 2: Derive PType<CustomNonWritableClass> using MapWritable
>> as base type
>>
>> --------------------------------------------------------------------------------------------------------------------
>>
>> public static MapWritableToCustomNonWritableClass extends
>> MapFn<MapWritable, CustomNonWritableClass> {
>>       public CustomNonWritableClass map(MapWritable input) {...}
>> }
>> public static CustomNonWritableClassToMapWritable
>> extends MapFn<CustomNonWritableClass, MapWritable>() {
>>      public MapWritable map(CustomNonWritableClass input) {...}
>> }
>>
>> PType<CustomNonWritableClass> derivedType = typeFamily.derived(
>>    CustomNonWritableClass.class,
>>    new MapWritableToCustomNonWritableClass(),
>>    new CustomNonWritableClassToMapWritable(),
>>    typeFamily.records(MapWritable.class)
>> );
>>
>> public class CustomDataSource extends
>>  FileSourceImpl<CustomNonWritableClass> {
>>
>>    public CustomDataSource() {
>>        super(new Path("xsource"),
>>        (PTableType<Void, CustomNonWritableClass >) derivedType),
>>        FormatBundle.forInput(CustomInputFormat.class));
>>    }
>>    ...
>> }
>>
>> When run this gets submitted to the cluster but the MR job fails with:
>>
>> 2014-06-10 10:31:23,653 FATAL [IPC Server handler 2 on 9290]
>> org.apache.hadoop.mapred.TaskAttemptListenerImpl: Task:
>> attempt_1401786307497_0078_m_000000_0 - exited :
>> java.lang.ClassCastException: com.xxx.xxx..CustomNonWritableClass cannot be
>> cast to org.apache.hadoop.io.MapWritable
>>      at
>> com.xxx.xxx.MapWritabToCustomNonWritableClass.map(MapWritabToCustomNonWritableClass.java:1)
>>      at org.apache.crunch.fn.CompositeMapFn.map(CompositeMapFn.java:63)
>>      at org.apache.crunch.MapFn.process(MapFn.java:34)
>>      at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:99)
>>      at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:110)
>>      at
>> org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60)
>>      at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
>>
>> Thanks,
>> Christian
>>
>
>
>
> --
> Director of Data Science
> Cloudera <http://www.cloudera.com>
> Twitter: @josh_wills <http://twitter.com/josh_wills>
>

Re: Create custom Crunch source (CustomDataSource) for CustomInputFormat

Posted by Josh Wills <jw...@cloudera.com>.
Hey Christian,

Good to see you again, I hope all is well. This is a complex setup, but the
good news is that we had to do it before for HBase 0.96, which also returns
non-Writable values in an InputFormat. The code you're going to want to use
as your reference is the HFileSource in crunch-hbase:

https://github.com/apache/crunch/blob/master/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileSource.java

A few comments:
1) First, ignore the Void key; instead of trying to return a PTable, you're
going to return a PCollection<NonWritableType> as the result of this
source. The key to doing that is in the Converter implementation that is
associated with the Source; look at the HBaseValueConverter (which is just
a simple pass-through value converter) to see how we do it for HFileSource.
The key thing to note in that converter is that the function
applyPTypeTransforms() returns false; this means that when reading/writing
data using that Converter, we don't apply the map functions from the PType
associated with the source (which is the right thing to do here as well.)
2) I'm assuming that there is some hadoop Serialization that supports the
non-Writable value type you're supporting that Hadoop has to be configured
to read; be sure to override the configureSource() method in your Source to
add those serializations to the Job configuration (again, see how it's done
in HFileSource.configureSource)
3) Given all that, the PType for your non-writable class that is associated
with the source should primarily be concerned with how to serialize it
during a shuffle or a read/write from another input format (like Avro, or
SequenceFile, or whatever), as we do in HBaseTypes. It won't actually be
used for reading/writing from the custom input format.

Hope that helps.

J


On Tue, Jun 10, 2014 at 2:42 AM, Christian Tzolov <
christian.tzolov@gmail.com> wrote:

> Hi all,
>
> I am trying to create a Crunch source for a custom InputFormat that has
> structure like this: CustomInputFormat<Void, CustomNonWritableClass>
>
> I've tried two implementations with no success. I must be missing
> something but not sure what?
>
> Implementation 1: Derive PType<Pair<Void, CustomNonWritableClass>> using
> MapWritable as base type
> ------------------------------------------------------------
> --------------------------------------------------------
>
> PType<Pair<Void, CustomNonWritableClass>> derivedType = typeFamily.derived(
>    (Class<Pair<Void, CustomNonWritableClass>>) Pair.of(null,
> null).getClass(),
>    new MapFn<MapWritable, Pair<Void, CustomNonWritableClass>>() {public
> Pair<Void, CustomNonWritableClass> map(MapWritable input) {...}},
>    new MapFn<Pair<Void, CustomNonWritableClass>, MapWritable>() {public
> MapWritable map(Pair<Void, CustomNonWritableClass> input) {...}},
>    typeFamily.records(MapWritable.class)
> );
>
> public class CustomDataSource extends FileTableSourceImpl<Void,
> CustomNonWritableClass > {
>
>    public CustomDataSource() {
>           super(new Path("xsource"),
>           (PTableType<Void, CustomNonWritableClass >) derivedType),
>           FormatBundle.forInput(CustomInputFormat.class));
>    }
>    ...
> }
>
> This implementation fails before submitting the job with the following
> error:
>
> Exception in thread "main" java.lang.ClassCastException:
> org.apache.crunch.types.writable.WritableType cannot be cast to
> org.apache.crunch.types.PTableType
>       at com.xxx.xxx.CustomDataSource.<init>(CustomDataSource.java:...)
>
>
>
> Implementation 2: Derive PType<CustomNonWritableClass> using MapWritable
> as base type
>
> --------------------------------------------------------------------------------------------------------------------
>
> public static MapWritableToCustomNonWritableClass extends
> MapFn<MapWritable, CustomNonWritableClass> {
>       public CustomNonWritableClass map(MapWritable input) {...}
> }
> public static CustomNonWritableClassToMapWritable
> extends MapFn<CustomNonWritableClass, MapWritable>() {
>      public MapWritable map(CustomNonWritableClass input) {...}
> }
>
> PType<CustomNonWritableClass> derivedType = typeFamily.derived(
>    CustomNonWritableClass.class,
>    new MapWritableToCustomNonWritableClass(),
>    new CustomNonWritableClassToMapWritable(),
>    typeFamily.records(MapWritable.class)
> );
>
> public class CustomDataSource extends
>  FileSourceImpl<CustomNonWritableClass> {
>
>    public CustomDataSource() {
>        super(new Path("xsource"),
>        (PTableType<Void, CustomNonWritableClass >) derivedType),
>        FormatBundle.forInput(CustomInputFormat.class));
>    }
>    ...
> }
>
> When run this gets submitted to the cluster but the MR job fails with:
>
> 2014-06-10 10:31:23,653 FATAL [IPC Server handler 2 on 9290]
> org.apache.hadoop.mapred.TaskAttemptListenerImpl: Task:
> attempt_1401786307497_0078_m_000000_0 - exited :
> java.lang.ClassCastException: com.xxx.xxx..CustomNonWritableClass cannot be
> cast to org.apache.hadoop.io.MapWritable
>      at
> com.xxx.xxx.MapWritabToCustomNonWritableClass.map(MapWritabToCustomNonWritableClass.java:1)
>      at org.apache.crunch.fn.CompositeMapFn.map(CompositeMapFn.java:63)
>      at org.apache.crunch.MapFn.process(MapFn.java:34)
>      at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:99)
>      at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:110)
>      at
> org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60)
>      at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
>
> Thanks,
> Christian
>



-- 
Director of Data Science
Cloudera <http://www.cloudera.com>
Twitter: @josh_wills <http://twitter.com/josh_wills>