You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@crunch.apache.org by Lucy Chen <lu...@gmail.com> on 2015/05/13 21:22:11 UTC

question about join

Hi all,

         I had a join step in my crunch pipeline, and it looks like the
following:

//get label data

PType<Labels> LabelsType = Avros.records(Labels.class);

PCollection<Labels> training_labels = input.parallelDo(new
LabelDataParser(), LabelsType);

PTable<String, Labels> labels_data = training_labels.

 parallelDo(new KeyOnLabels("sample_ID"), tableOf(strings(), LabelsType));

//get features

PType<Feats> FeatsType = Avros.records(Feats.class);

PCollection<Feats> training_feats = Feature.FeatLoader(pipeline,
sample_features_inputs);

PTable<String, Feats> feats_data = training_feats.parallelDo(new KeyOnFeats(
"sample_ID"), tableOf(strings(), FeatsType));


//join labels and features

JoinStrategy<String, Labels, Feats> strategy = new
DefaultJoinStrategy<String, Labels, Feats>(20);

PTable<String, Pair<Labels, Feats>> joined_training = strategy.

join(labels_data, feats_data, JoinType.INNER_JOIN);


//class Labels

public class Labels implements java.io.Serializable, Cloneable{

 private String class_ID;

private String sample_ID;

private int binary_ind;

 public Labels()

{

this(null, null, 0);

}

         public Labels(String class_ID, String sample_ID, int ind)

{

this.class_ID = class_ID;

this.sample_ID = sample_ID;

this.binary_ind = ind;

}

        ...

}


//class Feats


public class *Feats* implements java.io.Serializable, Cloneable{

 private String sample_id;

private String sample_name;

private Map<String, Float> feat;

 public Feats()

{

this(null, null, null);

}

 public Feats(String id, String name, Map<String, Float> feat)

{

this.sample_id = id;

this.sample_name = name;

this.feat = feat;

 }

       ...


}


   The outputs of labels_data and feats_data are both fine; but the join
step throws the following exception:


Error: java.lang.ClassCastException: java.lang.String cannot be cast to
org.apache.crunch.Pair at
org.apache.crunch.lib.join.DefaultJoinStrategy$1.map(DefaultJoinStrategy.java:87)
at org.apache.crunch.MapFn.process(MapFn.java:34) at
org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at
org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)
at org.apache.crunch.MapFn.process(MapFn.java:34) at
org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at
org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:109) at
org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60) at
org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) at
org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763) at
org.apache.hadoop.mapred.MapTask.run(MapTask.java:339) at
org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162) at
java.security.AccessController.doPrivileged(Native Method) at
javax.security.auth.Subject.doAs(Subject.java:415) at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)


       This issue already bothered me for a while; Did any one get a
similar issue here? Is there another option that will solve it?


      Btw, I already successfully ran the following joining job:


JoinStrategy<String, Float, Tuple3<String, String, Float>> strategy = new
DefaultJoinStrategy<String, Float, Tuple3<String, String, Float>>(100);

 PTable<String, Pair<Float,Tuple3<String, String, Float>>> joined =
strategy.join(input_A, input_B, JoinType.INNER_JOIN);


   So I guess the issue may be still related to the Avro types that I
defined.


        Thanks for your advice.


Lucy

Re: question about join

Posted by Lucy Chen <lu...@gmail.com>.
Hi Josh,

         Thanks for your quick response. You are right. It should be
training_labels.write(At.avroFile(output_path+"/training_labels_avro",
LabelsType), WriteMode.OVERWRITE) instead. Sorry about the mistake. After
another round of diagnosis with toy data, the codes suddenly turned out to
work after I tried to store the avro file. Although I don't know how it
turned out to work from not working, now it works without any changes.
Probably some weird thing happened.

        Thanks again for your time and patience.

Best,
Lucy

On Fri, May 15, 2015 at 12:05 PM, Lucy Chen <lu...@gmail.com>
wrote:

> Hi Josh,
>
>          Now looks like the issue probably happened with the Class Labels.
> I tried to store the training_labels as an avro output (txt outputs are
> OK),
>
> training_labels.write(At.avroFile(output_path+"/training_labels_avro"),
> LabelsType, WriteMode.OVERWRITE);
>
>
>             but get the following  err:
>
> The method write(Target, Target.WriteMode) in the type PCollection<Labels>
> is not applicable for the arguments (SourceTarget<GenericData.Record>,
> PType<Labels>, Target.WriteMode)
>
>
>               Will get the similar issue when storing the labels_data
> which is the first input of join. It seemed that Crunch did not recognize
> the type of training_labels. But why here it treated it a
> GenericData.Record?
>
>
>               Any suggestions?
>
>
>               Thanks!
>
>
> Lucy
>
> On Wed, May 13, 2015 at 12:22 PM, Lucy Chen <lu...@gmail.com>
> wrote:
>
>> Hi all,
>>
>>          I had a join step in my crunch pipeline, and it looks like the
>> following:
>>
>> //get label data
>>
>> PType<Labels> LabelsType = Avros.records(Labels.class);
>>
>> PCollection<Labels> training_labels = input.parallelDo(new
>> LabelDataParser(), LabelsType);
>>
>> PTable<String, Labels> labels_data = training_labels.
>>
>>  parallelDo(new KeyOnLabels("sample_ID"), tableOf(strings(),
>> LabelsType));
>>
>> //get features
>>
>> PType<Feats> FeatsType = Avros.records(Feats.class);
>>
>> PCollection<Feats> training_feats = Feature.FeatLoader(pipeline,
>> sample_features_inputs);
>>
>> PTable<String, Feats> feats_data = training_feats.parallelDo(new
>> KeyOnFeats("sample_ID"), tableOf(strings(), FeatsType));
>>
>>
>> //join labels and features
>>
>> JoinStrategy<String, Labels, Feats> strategy = new
>> DefaultJoinStrategy<String, Labels, Feats>(20);
>>
>> PTable<String, Pair<Labels, Feats>> joined_training = strategy.
>>
>> join(labels_data, feats_data, JoinType.INNER_JOIN);
>>
>>
>> //class Labels
>>
>> public class Labels implements java.io.Serializable, Cloneable{
>>
>>  private String class_ID;
>>
>> private String sample_ID;
>>
>> private int binary_ind;
>>
>>  public Labels()
>>
>> {
>>
>> this(null, null, 0);
>>
>> }
>>
>>          public Labels(String class_ID, String sample_ID, int ind)
>>
>> {
>>
>> this.class_ID = class_ID;
>>
>> this.sample_ID = sample_ID;
>>
>> this.binary_ind = ind;
>>
>> }
>>
>>         ...
>>
>> }
>>
>>
>> //class Feats
>>
>>
>> public class *Feats* implements java.io.Serializable, Cloneable{
>>
>>  private String sample_id;
>>
>> private String sample_name;
>>
>> private Map<String, Float> feat;
>>
>>  public Feats()
>>
>> {
>>
>> this(null, null, null);
>>
>> }
>>
>>  public Feats(String id, String name, Map<String, Float> feat)
>>
>> {
>>
>> this.sample_id = id;
>>
>> this.sample_name = name;
>>
>> this.feat = feat;
>>
>>  }
>>
>>        ...
>>
>>
>> }
>>
>>
>>    The outputs of labels_data and feats_data are both fine; but the join
>> step throws the following exception:
>>
>>
>> Error: java.lang.ClassCastException: java.lang.String cannot be cast to
>> org.apache.crunch.Pair at
>> org.apache.crunch.lib.join.DefaultJoinStrategy$1.map(DefaultJoinStrategy.java:87)
>> at org.apache.crunch.MapFn.process(MapFn.java:34) at
>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at
>> org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)
>> at org.apache.crunch.MapFn.process(MapFn.java:34) at
>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at
>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:109) at
>> org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60) at
>> org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) at
>> org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763) at
>> org.apache.hadoop.mapred.MapTask.run(MapTask.java:339) at
>> org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162) at
>> java.security.AccessController.doPrivileged(Native Method) at
>> javax.security.auth.Subject.doAs(Subject.java:415) at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
>> at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)
>>
>>
>>        This issue already bothered me for a while; Did any one get a
>> similar issue here? Is there another option that will solve it?
>>
>>
>>       Btw, I already successfully ran the following joining job:
>>
>>
>> JoinStrategy<String, Float, Tuple3<String, String, Float>> strategy = new
>> DefaultJoinStrategy<String, Float, Tuple3<String, String, Float>>(100);
>>
>>  PTable<String, Pair<Float,Tuple3<String, String, Float>>> joined =
>> strategy.join(input_A, input_B, JoinType.INNER_JOIN);
>>
>>
>>    So I guess the issue may be still related to the Avro types that I
>> defined.
>>
>>
>>         Thanks for your advice.
>>
>>
>> Lucy
>>
>>
>>
>>
>>
>

Re: question about join

Posted by Josh Wills <jw...@cloudera.com>.
Hey Lucy,

The write() method on PCollection doesn't take a PType argument; it gets it
directly from the PType of the PCollection.
training_labels.write(At.avroFile(...), WriteMode.OVERWRITE) will work.

J

On Fri, May 15, 2015 at 12:05 PM, Lucy Chen <lu...@gmail.com>
wrote:

> Hi Josh,
>
>          Now looks like the issue probably happened with the Class Labels.
> I tried to store the training_labels as an avro output (txt outputs are
> OK),
>
> training_labels.write(At.avroFile(output_path+"/training_labels_avro"),
> LabelsType, WriteMode.OVERWRITE);
>
>
>             but get the following  err:
>
> The method write(Target, Target.WriteMode) in the type PCollection<Labels>
> is not applicable for the arguments (SourceTarget<GenericData.Record>,
> PType<Labels>, Target.WriteMode)
>
>
>               Will get the similar issue when storing the labels_data
> which is the first input of join. It seemed that Crunch did not recognize
> the type of training_labels. But why here it treated it a
> GenericData.Record?
>
>
>               Any suggestions?
>
>
>               Thanks!
>
>
> Lucy
>
> On Wed, May 13, 2015 at 12:22 PM, Lucy Chen <lu...@gmail.com>
> wrote:
>
>> Hi all,
>>
>>          I had a join step in my crunch pipeline, and it looks like the
>> following:
>>
>> //get label data
>>
>> PType<Labels> LabelsType = Avros.records(Labels.class);
>>
>> PCollection<Labels> training_labels = input.parallelDo(new
>> LabelDataParser(), LabelsType);
>>
>> PTable<String, Labels> labels_data = training_labels.
>>
>>  parallelDo(new KeyOnLabels("sample_ID"), tableOf(strings(),
>> LabelsType));
>>
>> //get features
>>
>> PType<Feats> FeatsType = Avros.records(Feats.class);
>>
>> PCollection<Feats> training_feats = Feature.FeatLoader(pipeline,
>> sample_features_inputs);
>>
>> PTable<String, Feats> feats_data = training_feats.parallelDo(new
>> KeyOnFeats("sample_ID"), tableOf(strings(), FeatsType));
>>
>>
>> //join labels and features
>>
>> JoinStrategy<String, Labels, Feats> strategy = new
>> DefaultJoinStrategy<String, Labels, Feats>(20);
>>
>> PTable<String, Pair<Labels, Feats>> joined_training = strategy.
>>
>> join(labels_data, feats_data, JoinType.INNER_JOIN);
>>
>>
>> //class Labels
>>
>> public class Labels implements java.io.Serializable, Cloneable{
>>
>>  private String class_ID;
>>
>> private String sample_ID;
>>
>> private int binary_ind;
>>
>>  public Labels()
>>
>> {
>>
>> this(null, null, 0);
>>
>> }
>>
>>          public Labels(String class_ID, String sample_ID, int ind)
>>
>> {
>>
>> this.class_ID = class_ID;
>>
>> this.sample_ID = sample_ID;
>>
>> this.binary_ind = ind;
>>
>> }
>>
>>         ...
>>
>> }
>>
>>
>> //class Feats
>>
>>
>> public class *Feats* implements java.io.Serializable, Cloneable{
>>
>>  private String sample_id;
>>
>> private String sample_name;
>>
>> private Map<String, Float> feat;
>>
>>  public Feats()
>>
>> {
>>
>> this(null, null, null);
>>
>> }
>>
>>  public Feats(String id, String name, Map<String, Float> feat)
>>
>> {
>>
>> this.sample_id = id;
>>
>> this.sample_name = name;
>>
>> this.feat = feat;
>>
>>  }
>>
>>        ...
>>
>>
>> }
>>
>>
>>    The outputs of labels_data and feats_data are both fine; but the join
>> step throws the following exception:
>>
>>
>> Error: java.lang.ClassCastException: java.lang.String cannot be cast to
>> org.apache.crunch.Pair at
>> org.apache.crunch.lib.join.DefaultJoinStrategy$1.map(DefaultJoinStrategy.java:87)
>> at org.apache.crunch.MapFn.process(MapFn.java:34) at
>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at
>> org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)
>> at org.apache.crunch.MapFn.process(MapFn.java:34) at
>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at
>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:109) at
>> org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60) at
>> org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) at
>> org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763) at
>> org.apache.hadoop.mapred.MapTask.run(MapTask.java:339) at
>> org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162) at
>> java.security.AccessController.doPrivileged(Native Method) at
>> javax.security.auth.Subject.doAs(Subject.java:415) at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
>> at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)
>>
>>
>>        This issue already bothered me for a while; Did any one get a
>> similar issue here? Is there another option that will solve it?
>>
>>
>>       Btw, I already successfully ran the following joining job:
>>
>>
>> JoinStrategy<String, Float, Tuple3<String, String, Float>> strategy = new
>> DefaultJoinStrategy<String, Float, Tuple3<String, String, Float>>(100);
>>
>>  PTable<String, Pair<Float,Tuple3<String, String, Float>>> joined =
>> strategy.join(input_A, input_B, JoinType.INNER_JOIN);
>>
>>
>>    So I guess the issue may be still related to the Avro types that I
>> defined.
>>
>>
>>         Thanks for your advice.
>>
>>
>> Lucy
>>
>>
>>
>>
>>
>


-- 
Director of Data Science
Cloudera <http://www.cloudera.com>
Twitter: @josh_wills <http://twitter.com/josh_wills>

Re: question about join

Posted by Lucy Chen <lu...@gmail.com>.
Hi Josh,

         Now looks like the issue probably happened with the Class Labels.
I tried to store the training_labels as an avro output (txt outputs are
OK),

training_labels.write(At.avroFile(output_path+"/training_labels_avro"),
LabelsType, WriteMode.OVERWRITE);


            but get the following  err:

The method write(Target, Target.WriteMode) in the type PCollection<Labels>
is not applicable for the arguments (SourceTarget<GenericData.Record>,
PType<Labels>, Target.WriteMode)


              Will get the similar issue when storing the labels_data which
is the first input of join. It seemed that Crunch did not recognize the
type of training_labels. But why here it treated it a GenericData.Record?


              Any suggestions?


              Thanks!


Lucy

On Wed, May 13, 2015 at 12:22 PM, Lucy Chen <lu...@gmail.com>
wrote:

> Hi all,
>
>          I had a join step in my crunch pipeline, and it looks like the
> following:
>
> //get label data
>
> PType<Labels> LabelsType = Avros.records(Labels.class);
>
> PCollection<Labels> training_labels = input.parallelDo(new
> LabelDataParser(), LabelsType);
>
> PTable<String, Labels> labels_data = training_labels.
>
>  parallelDo(new KeyOnLabels("sample_ID"), tableOf(strings(), LabelsType));
>
> //get features
>
> PType<Feats> FeatsType = Avros.records(Feats.class);
>
> PCollection<Feats> training_feats = Feature.FeatLoader(pipeline,
> sample_features_inputs);
>
> PTable<String, Feats> feats_data = training_feats.parallelDo(new
> KeyOnFeats("sample_ID"), tableOf(strings(), FeatsType));
>
>
> //join labels and features
>
> JoinStrategy<String, Labels, Feats> strategy = new
> DefaultJoinStrategy<String, Labels, Feats>(20);
>
> PTable<String, Pair<Labels, Feats>> joined_training = strategy.
>
> join(labels_data, feats_data, JoinType.INNER_JOIN);
>
>
> //class Labels
>
> public class Labels implements java.io.Serializable, Cloneable{
>
>  private String class_ID;
>
> private String sample_ID;
>
> private int binary_ind;
>
>  public Labels()
>
> {
>
> this(null, null, 0);
>
> }
>
>          public Labels(String class_ID, String sample_ID, int ind)
>
> {
>
> this.class_ID = class_ID;
>
> this.sample_ID = sample_ID;
>
> this.binary_ind = ind;
>
> }
>
>         ...
>
> }
>
>
> //class Feats
>
>
> public class *Feats* implements java.io.Serializable, Cloneable{
>
>  private String sample_id;
>
> private String sample_name;
>
> private Map<String, Float> feat;
>
>  public Feats()
>
> {
>
> this(null, null, null);
>
> }
>
>  public Feats(String id, String name, Map<String, Float> feat)
>
> {
>
> this.sample_id = id;
>
> this.sample_name = name;
>
> this.feat = feat;
>
>  }
>
>        ...
>
>
> }
>
>
>    The outputs of labels_data and feats_data are both fine; but the join
> step throws the following exception:
>
>
> Error: java.lang.ClassCastException: java.lang.String cannot be cast to
> org.apache.crunch.Pair at
> org.apache.crunch.lib.join.DefaultJoinStrategy$1.map(DefaultJoinStrategy.java:87)
> at org.apache.crunch.MapFn.process(MapFn.java:34) at
> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at
> org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)
> at org.apache.crunch.MapFn.process(MapFn.java:34) at
> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at
> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:109) at
> org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60) at
> org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) at
> org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763) at
> org.apache.hadoop.mapred.MapTask.run(MapTask.java:339) at
> org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162) at
> java.security.AccessController.doPrivileged(Native Method) at
> javax.security.auth.Subject.doAs(Subject.java:415) at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
> at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)
>
>
>        This issue already bothered me for a while; Did any one get a
> similar issue here? Is there another option that will solve it?
>
>
>       Btw, I already successfully ran the following joining job:
>
>
> JoinStrategy<String, Float, Tuple3<String, String, Float>> strategy = new
> DefaultJoinStrategy<String, Float, Tuple3<String, String, Float>>(100);
>
>  PTable<String, Pair<Float,Tuple3<String, String, Float>>> joined =
> strategy.join(input_A, input_B, JoinType.INNER_JOIN);
>
>
>    So I guess the issue may be still related to the Avro types that I
> defined.
>
>
>         Thanks for your advice.
>
>
> Lucy
>
>
>
>
>

Re: question about join

Posted by Lucy Chen <lu...@gmail.com>.
Hi Josh,

       Thanks for your advice. I ran your testing case and the join
function looks fine. No exception from it. Let me do another round of
diagnosis and see whether I can find the issue.

Lucy

[foo,[StringWrapper [value=foo],StringWrapper [value=foo]]]

[wills,[StringWrapper [value=wills],StringWrapper [value=wills]]]

On Thu, May 14, 2015 at 6:27 AM, Josh Wills <jo...@gmail.com> wrote:

> We have some testing helper code in the project (the dependency is named
> crunch-test-0.11.0-hadoop2) that we use for running integration tests in
> the main project. The snippet I sent was a modification of this one:
>
>
> https://github.com/apache/crunch/blob/master/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroReflectIT.java
>
> On Wed, May 13, 2015 at 11:02 PM, Lucy Chen <lu...@gmail.com>
> wrote:
>
>> Hi Josh,
>>
>>          I am using crunch-core-0.11.0-hadoop2.jar; I did not find a
>> class AvroReflectIT under org.apache.crunch.io.avro. Is it in a new
>> version? Meanwhile how should I set the variable tmpDir in my code? What
>> do you mean by "done inside of AvroReflectIT"? You mean copy paste the
>> above code within the class AvroReflectIT and run it?
>>
>>          Thanks!
>>
>> Lucy
>>
>> On Wed, May 13, 2015 at 6:09 PM, Josh Wills <jw...@cloudera.com> wrote:
>>
>>> Hey Lucy,
>>>
>>> I tried to see if I could replicate the error by writing the following
>>> integration test (done inside of AvroReflectIT)-- does something like this
>>> fail when you run it? And you're using Crunch 0.11-hadoop2, or something
>>> earlier?
>>>
>>> @Test
>>> public void testJoinReflectedData() throws Exception {
>>>   Pipeline pipeline = new MRPipeline(AvroReflectIT.class, tmpDir.getDefaultConfiguration());
>>>   AvroType<StringWrapper> atype = Avros.records(StringWrapper.class);
>>>   PCollection<StringWrapper> p1 = pipeline.create(ImmutableList.of(
>>>     new StringWrapper("josh"), new StringWrapper("wills"), new StringWrapper("foo")), atype);
>>>   PCollection<StringWrapper> p2 = pipeline.create(ImmutableList.of(
>>>           new StringWrapper("mike"), new StringWrapper("wills"), new StringWrapper("foo")), atype);
>>>   PTable<String, StringWrapper> pt1 = p1.parallelDo(new MapFn<StringWrapper, Pair<String, StringWrapper>>() {
>>>     @Override
>>>     public Pair<String, StringWrapper> map(StringWrapper input) {
>>>       return Pair.of(input.getValue(), input);
>>>     }
>>>   }, Avros.tableOf(Avros.strings(), atype));
>>>   PTable<String, StringWrapper> pt2 = p2.parallelDo(new MapFn<StringWrapper, Pair<String, StringWrapper>>() {
>>>     @Override
>>>     public Pair<String, StringWrapper> map(StringWrapper input) {
>>>       return Pair.of(input.getValue(), input);
>>>     }
>>>   }, Avros.tableOf(Avros.strings(), atype));
>>>   JoinStrategy<String, StringWrapper, StringWrapper> joinStrategy = new DefaultJoinStrategy<String, StringWrapper, StringWrapper>();
>>>   PTable<String, Pair<StringWrapper, StringWrapper>> res = joinStrategy.join(pt1, pt2, JoinType.INNER_JOIN);
>>>   for (Pair<String, Pair<StringWrapper, StringWrapper>> p : res.materialize()) {
>>>     System.out.println(p);
>>>   }
>>> }
>>>
>>>
>>> On Wed, May 13, 2015 at 3:10 PM, Lucy Chen <lu...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>>       I tried to dump the members from Label class to a TupleN, and
>>>> dump only sample_id and sample_name from Feats class to a Pair, and then do
>>>> the joining,
>>>>
>>>> JoinStrategy<String, TupleN, Pair<String, String>> strategy
>>>>
>>>> = new DefaultJoinStrategy<String, TupleN, Pair<String, String>>(20);
>>>>
>>>> PTable<String, Pair<TupleN, Pair<String, String>>> joined_training =
>>>> strategy.
>>>>
>>>> join(labels_data, feats_data, JoinType.INNER_JOIN);
>>>>
>>>>
>>>> The data dump step is good; I already verified the outputs.
>>>>
>>>> And still get the same exception from the joining. Looks like that it
>>>> is definitely not the issue of the Avro type that I defined. Now just got
>>>> stuck here...
>>>>
>>>>
>>>> Error: java.lang.ClassCastException: java.lang.String cannot be cast to
>>>> org.apache.crunch.Pair at
>>>> org.apache.crunch.lib.join.DefaultJoinStrategy$2.map(DefaultJoinStrategy.java:94)
>>>> at org.apache.crunch.MapFn.process(MapFn.java:34) at
>>>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at
>>>> org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)
>>>> at org.apache.crunch.MapFn.process(MapFn.java:34) at
>>>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at
>>>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:109) at
>>>> org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60) at
>>>> org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) at
>>>> org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763) at
>>>> org.apache.hadoop.mapred.MapTask.run(MapTask.java:339) at
>>>> org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162) at
>>>> java.security.AccessController.doPrivileged(Native Method) at
>>>> javax.security.auth.Subject.doAs(Subject.java:415) at
>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
>>>> at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)
>>>>
>>>> On Wed, May 13, 2015 at 2:05 PM, Lucy Chen <lu...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Josh,
>>>>>
>>>>>          Those two functions bring the keys for each PCollections so
>>>>> that they can join in the next step. Here is the functions look like:
>>>>>
>>>>> public class KeyOnLabels extends DoFn<Labels, Pair<String, Labels>>{
>>>>>
>>>>>  private String key;
>>>>>
>>>>>  public KeyOnLabels(String input)
>>>>>
>>>>> {
>>>>>
>>>>> this.key = input;
>>>>>
>>>>> }
>>>>>
>>>>>  @Override
>>>>>
>>>>> public void process(Labels input, Emitter<Pair<String, Labels>>
>>>>> emitter)
>>>>>
>>>>> {
>>>>>
>>>>> if(key.equalsIgnoreCase("class_ID"))
>>>>>
>>>>>    emitter.emit(Pair.of(input.getClassID(), input));
>>>>>
>>>>> else if(key.equalsIgnoreCase("sample_ID"))
>>>>>
>>>>> emitter.emit(Pair.of(input.getSampleID(), input));
>>>>>
>>>>> else
>>>>>
>>>>> emitter.emit(Pair.of(Integer.toString(input.getBinaryIndicator()),
>>>>> input));
>>>>>
>>>>>  }
>>>>>
>>>>>
>>>>> }
>>>>>
>>>>> public class KeyOnFeats extends DoFn<Feats, Pair<String, Feats>>{
>>>>>
>>>>>  private final static Logger logger = Logger
>>>>>
>>>>>       .getLogger(KeyOnFeats.class.getName());
>>>>>
>>>>> private String key;
>>>>>
>>>>> public KeyOnFeats(String input)
>>>>>
>>>>> {
>>>>>
>>>>> this.key = input;
>>>>>
>>>>> }
>>>>>
>>>>> @Override
>>>>>
>>>>> public void process(Feats input, Emitter<Pair<String, Feats>> emitter)
>>>>>
>>>>> {
>>>>>
>>>>> if(key.equals("sample_ID"))
>>>>>
>>>>> emitter.emit(Pair.of(input.getSampleID(), input));
>>>>>
>>>>> else
>>>>>
>>>>> logger.error("The key should be specified as sample_ID only!");
>>>>>
>>>>> }
>>>>>
>>>>>
>>>>> }
>>>>>
>>>>>
>>>>> Meanwhile I checked the two outputs and they looked like
>>>>> normal.Something like the follows:
>>>>>
>>>>>
>>>>> {"key": "ID1", "value": Feats@14f5e86}
>>>>>
>>>>>
>>>>> {"key": "ID1", "value": Labels@489983f3}
>>>>>
>>>>>
>>>>>     That's why I feel something weird happened in the join step.
>>>>>
>>>>>
>>>>>     Thanks!
>>>>>
>>>>>
>>>>> Lucy
>>>>>
>>>>> On Wed, May 13, 2015 at 12:34 PM, Josh Wills <jo...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> What do the KeyOnLabels and KeyOnFeats functions return? From the
>>>>>> error, it looks like the labels_data PTable is actually a
>>>>>> PCollection<String>, which would happen if KeyOnLabels was somehow
>>>>>> returning a String in some situations despite claiming to return a
>>>>>> Pair<String, Labels>.
>>>>>>
>>>>>> On Wed, May 13, 2015 at 12:31 PM, Lucy Chen <
>>>>>> lucychen2014fall@gmail.com> wrote:
>>>>>>
>>>>>>> Meanwhile, I just realized that one of my joining jobs look also OK,
>>>>>>> which also had the Avro type in it:
>>>>>>>
>>>>>>> JoinStrategy<String, Feats, Feats> strategy
>>>>>>>
>>>>>>> = new DefaultJoinStrategy<String, Feats, Feats>(50);
>>>>>>>
>>>>>>> PTable<String, Pair<Feats,Feats>> joined = strategy.join(input_A,
>>>>>>> input_B, JoinType.INNER_JOIN);
>>>>>>>
>>>>>>>
>>>>>>> So the Avro type probably is not the issue.
>>>>>>>
>>>>>>>
>>>>>>> Any advice?
>>>>>>>
>>>>>>>
>>>>>>> Lucy
>>>>>>>
>>>>>>> On Wed, May 13, 2015 at 12:22 PM, Lucy Chen <
>>>>>>> lucychen2014fall@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi all,
>>>>>>>>
>>>>>>>>          I had a join step in my crunch pipeline, and it looks like
>>>>>>>> the following:
>>>>>>>>
>>>>>>>> //get label data
>>>>>>>>
>>>>>>>> PType<Labels> LabelsType = Avros.records(Labels.class);
>>>>>>>>
>>>>>>>> PCollection<Labels> training_labels = input.parallelDo(new
>>>>>>>> LabelDataParser(), LabelsType);
>>>>>>>>
>>>>>>>> PTable<String, Labels> labels_data = training_labels.
>>>>>>>>
>>>>>>>>  parallelDo(new KeyOnLabels("sample_ID"), tableOf(strings(),
>>>>>>>> LabelsType));
>>>>>>>>
>>>>>>>> //get features
>>>>>>>>
>>>>>>>> PType<Feats> FeatsType = Avros.records(Feats.class);
>>>>>>>>
>>>>>>>> PCollection<Feats> training_feats = Feature.FeatLoader(pipeline,
>>>>>>>> sample_features_inputs);
>>>>>>>>
>>>>>>>> PTable<String, Feats> feats_data = training_feats.parallelDo(new
>>>>>>>> KeyOnFeats("sample_ID"), tableOf(strings(), FeatsType));
>>>>>>>>
>>>>>>>>
>>>>>>>> //join labels and features
>>>>>>>>
>>>>>>>> JoinStrategy<String, Labels, Feats> strategy = new
>>>>>>>> DefaultJoinStrategy<String, Labels, Feats>(20);
>>>>>>>>
>>>>>>>> PTable<String, Pair<Labels, Feats>> joined_training = strategy.
>>>>>>>>
>>>>>>>> join(labels_data, feats_data, JoinType.INNER_JOIN);
>>>>>>>>
>>>>>>>>
>>>>>>>> //class Labels
>>>>>>>>
>>>>>>>> public class Labels implements java.io.Serializable, Cloneable{
>>>>>>>>
>>>>>>>>  private String class_ID;
>>>>>>>>
>>>>>>>> private String sample_ID;
>>>>>>>>
>>>>>>>> private int binary_ind;
>>>>>>>>
>>>>>>>>  public Labels()
>>>>>>>>
>>>>>>>> {
>>>>>>>>
>>>>>>>> this(null, null, 0);
>>>>>>>>
>>>>>>>> }
>>>>>>>>
>>>>>>>>          public Labels(String class_ID, String sample_ID, int ind)
>>>>>>>>
>>>>>>>> {
>>>>>>>>
>>>>>>>> this.class_ID = class_ID;
>>>>>>>>
>>>>>>>> this.sample_ID = sample_ID;
>>>>>>>>
>>>>>>>> this.binary_ind = ind;
>>>>>>>>
>>>>>>>> }
>>>>>>>>
>>>>>>>>         ...
>>>>>>>>
>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>> //class Feats
>>>>>>>>
>>>>>>>>
>>>>>>>> public class *Feats* implements java.io.Serializable, Cloneable{
>>>>>>>>
>>>>>>>>  private String sample_id;
>>>>>>>>
>>>>>>>> private String sample_name;
>>>>>>>>
>>>>>>>> private Map<String, Float> feat;
>>>>>>>>
>>>>>>>>  public Feats()
>>>>>>>>
>>>>>>>> {
>>>>>>>>
>>>>>>>> this(null, null, null);
>>>>>>>>
>>>>>>>> }
>>>>>>>>
>>>>>>>>  public Feats(String id, String name, Map<String, Float> feat)
>>>>>>>>
>>>>>>>> {
>>>>>>>>
>>>>>>>> this.sample_id = id;
>>>>>>>>
>>>>>>>> this.sample_name = name;
>>>>>>>>
>>>>>>>> this.feat = feat;
>>>>>>>>
>>>>>>>>  }
>>>>>>>>
>>>>>>>>        ...
>>>>>>>>
>>>>>>>>
>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>>    The outputs of labels_data and feats_data are both fine; but
>>>>>>>> the join step throws the following exception:
>>>>>>>>
>>>>>>>>
>>>>>>>> Error: java.lang.ClassCastException: java.lang.String cannot be
>>>>>>>> cast to org.apache.crunch.Pair at
>>>>>>>> org.apache.crunch.lib.join.DefaultJoinStrategy$1.map(DefaultJoinStrategy.java:87)
>>>>>>>> at org.apache.crunch.MapFn.process(MapFn.java:34) at
>>>>>>>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at
>>>>>>>> org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)
>>>>>>>> at org.apache.crunch.MapFn.process(MapFn.java:34) at
>>>>>>>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at
>>>>>>>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:109) at
>>>>>>>> org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60) at
>>>>>>>> org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) at
>>>>>>>> org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763) at
>>>>>>>> org.apache.hadoop.mapred.MapTask.run(MapTask.java:339) at
>>>>>>>> org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162) at
>>>>>>>> java.security.AccessController.doPrivileged(Native Method) at
>>>>>>>> javax.security.auth.Subject.doAs(Subject.java:415) at
>>>>>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
>>>>>>>> at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)
>>>>>>>>
>>>>>>>>
>>>>>>>>        This issue already bothered me for a while; Did any one get
>>>>>>>> a similar issue here? Is there another option that will solve it?
>>>>>>>>
>>>>>>>>
>>>>>>>>       Btw, I already successfully ran the following joining job:
>>>>>>>>
>>>>>>>>
>>>>>>>> JoinStrategy<String, Float, Tuple3<String, String, Float>> strategy
>>>>>>>> = new DefaultJoinStrategy<String, Float, Tuple3<String, String,
>>>>>>>> Float>>(100);
>>>>>>>>
>>>>>>>>  PTable<String, Pair<Float,Tuple3<String, String, Float>>> joined
>>>>>>>> = strategy.join(input_A, input_B, JoinType.INNER_JOIN);
>>>>>>>>
>>>>>>>>
>>>>>>>>    So I guess the issue may be still related to the Avro types
>>>>>>>> that I defined.
>>>>>>>>
>>>>>>>>
>>>>>>>>         Thanks for your advice.
>>>>>>>>
>>>>>>>>
>>>>>>>> Lucy
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>>
>>> --
>>> Director of Data Science
>>> Cloudera <http://www.cloudera.com>
>>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>>
>>
>>
>

Re: question about join

Posted by Josh Wills <jo...@gmail.com>.
We have some testing helper code in the project (the dependency is named
crunch-test-0.11.0-hadoop2) that we use for running integration tests in
the main project. The snippet I sent was a modification of this one:

https://github.com/apache/crunch/blob/master/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroReflectIT.java

On Wed, May 13, 2015 at 11:02 PM, Lucy Chen <lu...@gmail.com>
wrote:

> Hi Josh,
>
>          I am using crunch-core-0.11.0-hadoop2.jar; I did not find a class
> AvroReflectIT under org.apache.crunch.io.avro. Is it in a new version?
> Meanwhile how should I set the variable tmpDir in my code? What do you
> mean by "done inside of AvroReflectIT"? You mean copy paste the above
> code within the class AvroReflectIT and run it?
>
>          Thanks!
>
> Lucy
>
> On Wed, May 13, 2015 at 6:09 PM, Josh Wills <jw...@cloudera.com> wrote:
>
>> Hey Lucy,
>>
>> I tried to see if I could replicate the error by writing the following
>> integration test (done inside of AvroReflectIT)-- does something like this
>> fail when you run it? And you're using Crunch 0.11-hadoop2, or something
>> earlier?
>>
>> @Test
>> public void testJoinReflectedData() throws Exception {
>>   Pipeline pipeline = new MRPipeline(AvroReflectIT.class, tmpDir.getDefaultConfiguration());
>>   AvroType<StringWrapper> atype = Avros.records(StringWrapper.class);
>>   PCollection<StringWrapper> p1 = pipeline.create(ImmutableList.of(
>>     new StringWrapper("josh"), new StringWrapper("wills"), new StringWrapper("foo")), atype);
>>   PCollection<StringWrapper> p2 = pipeline.create(ImmutableList.of(
>>           new StringWrapper("mike"), new StringWrapper("wills"), new StringWrapper("foo")), atype);
>>   PTable<String, StringWrapper> pt1 = p1.parallelDo(new MapFn<StringWrapper, Pair<String, StringWrapper>>() {
>>     @Override
>>     public Pair<String, StringWrapper> map(StringWrapper input) {
>>       return Pair.of(input.getValue(), input);
>>     }
>>   }, Avros.tableOf(Avros.strings(), atype));
>>   PTable<String, StringWrapper> pt2 = p2.parallelDo(new MapFn<StringWrapper, Pair<String, StringWrapper>>() {
>>     @Override
>>     public Pair<String, StringWrapper> map(StringWrapper input) {
>>       return Pair.of(input.getValue(), input);
>>     }
>>   }, Avros.tableOf(Avros.strings(), atype));
>>   JoinStrategy<String, StringWrapper, StringWrapper> joinStrategy = new DefaultJoinStrategy<String, StringWrapper, StringWrapper>();
>>   PTable<String, Pair<StringWrapper, StringWrapper>> res = joinStrategy.join(pt1, pt2, JoinType.INNER_JOIN);
>>   for (Pair<String, Pair<StringWrapper, StringWrapper>> p : res.materialize()) {
>>     System.out.println(p);
>>   }
>> }
>>
>>
>> On Wed, May 13, 2015 at 3:10 PM, Lucy Chen <lu...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>>       I tried to dump the members from Label class to a TupleN, and dump
>>> only sample_id and sample_name from Feats class to a Pair, and then do the
>>> joining,
>>>
>>> JoinStrategy<String, TupleN, Pair<String, String>> strategy
>>>
>>> = new DefaultJoinStrategy<String, TupleN, Pair<String, String>>(20);
>>>
>>> PTable<String, Pair<TupleN, Pair<String, String>>> joined_training =
>>> strategy.
>>>
>>> join(labels_data, feats_data, JoinType.INNER_JOIN);
>>>
>>>
>>> The data dump step is good; I already verified the outputs.
>>>
>>> And still get the same exception from the joining. Looks like that it is
>>> definitely not the issue of the Avro type that I defined. Now just got
>>> stuck here...
>>>
>>>
>>> Error: java.lang.ClassCastException: java.lang.String cannot be cast to
>>> org.apache.crunch.Pair at
>>> org.apache.crunch.lib.join.DefaultJoinStrategy$2.map(DefaultJoinStrategy.java:94)
>>> at org.apache.crunch.MapFn.process(MapFn.java:34) at
>>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at
>>> org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)
>>> at org.apache.crunch.MapFn.process(MapFn.java:34) at
>>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at
>>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:109) at
>>> org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60) at
>>> org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) at
>>> org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763) at
>>> org.apache.hadoop.mapred.MapTask.run(MapTask.java:339) at
>>> org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162) at
>>> java.security.AccessController.doPrivileged(Native Method) at
>>> javax.security.auth.Subject.doAs(Subject.java:415) at
>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
>>> at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)
>>>
>>> On Wed, May 13, 2015 at 2:05 PM, Lucy Chen <lu...@gmail.com>
>>> wrote:
>>>
>>>> Hi Josh,
>>>>
>>>>          Those two functions bring the keys for each PCollections so
>>>> that they can join in the next step. Here is the functions look like:
>>>>
>>>> public class KeyOnLabels extends DoFn<Labels, Pair<String, Labels>>{
>>>>
>>>>  private String key;
>>>>
>>>>  public KeyOnLabels(String input)
>>>>
>>>> {
>>>>
>>>> this.key = input;
>>>>
>>>> }
>>>>
>>>>  @Override
>>>>
>>>> public void process(Labels input, Emitter<Pair<String, Labels>>
>>>> emitter)
>>>>
>>>> {
>>>>
>>>> if(key.equalsIgnoreCase("class_ID"))
>>>>
>>>>    emitter.emit(Pair.of(input.getClassID(), input));
>>>>
>>>> else if(key.equalsIgnoreCase("sample_ID"))
>>>>
>>>> emitter.emit(Pair.of(input.getSampleID(), input));
>>>>
>>>> else
>>>>
>>>> emitter.emit(Pair.of(Integer.toString(input.getBinaryIndicator()),
>>>> input));
>>>>
>>>>  }
>>>>
>>>>
>>>> }
>>>>
>>>> public class KeyOnFeats extends DoFn<Feats, Pair<String, Feats>>{
>>>>
>>>>  private final static Logger logger = Logger
>>>>
>>>>       .getLogger(KeyOnFeats.class.getName());
>>>>
>>>> private String key;
>>>>
>>>> public KeyOnFeats(String input)
>>>>
>>>> {
>>>>
>>>> this.key = input;
>>>>
>>>> }
>>>>
>>>> @Override
>>>>
>>>> public void process(Feats input, Emitter<Pair<String, Feats>> emitter)
>>>>
>>>> {
>>>>
>>>> if(key.equals("sample_ID"))
>>>>
>>>> emitter.emit(Pair.of(input.getSampleID(), input));
>>>>
>>>> else
>>>>
>>>> logger.error("The key should be specified as sample_ID only!");
>>>>
>>>> }
>>>>
>>>>
>>>> }
>>>>
>>>>
>>>> Meanwhile I checked the two outputs and they looked like
>>>> normal.Something like the follows:
>>>>
>>>>
>>>> {"key": "ID1", "value": Feats@14f5e86}
>>>>
>>>>
>>>> {"key": "ID1", "value": Labels@489983f3}
>>>>
>>>>
>>>>     That's why I feel something weird happened in the join step.
>>>>
>>>>
>>>>     Thanks!
>>>>
>>>>
>>>> Lucy
>>>>
>>>> On Wed, May 13, 2015 at 12:34 PM, Josh Wills <jo...@gmail.com>
>>>> wrote:
>>>>
>>>>> What do the KeyOnLabels and KeyOnFeats functions return? From the
>>>>> error, it looks like the labels_data PTable is actually a
>>>>> PCollection<String>, which would happen if KeyOnLabels was somehow
>>>>> returning a String in some situations despite claiming to return a
>>>>> Pair<String, Labels>.
>>>>>
>>>>> On Wed, May 13, 2015 at 12:31 PM, Lucy Chen <
>>>>> lucychen2014fall@gmail.com> wrote:
>>>>>
>>>>>> Meanwhile, I just realized that one of my joining jobs look also OK,
>>>>>> which also had the Avro type in it:
>>>>>>
>>>>>> JoinStrategy<String, Feats, Feats> strategy
>>>>>>
>>>>>> = new DefaultJoinStrategy<String, Feats, Feats>(50);
>>>>>>
>>>>>> PTable<String, Pair<Feats,Feats>> joined = strategy.join(input_A,
>>>>>> input_B, JoinType.INNER_JOIN);
>>>>>>
>>>>>>
>>>>>> So the Avro type probably is not the issue.
>>>>>>
>>>>>>
>>>>>> Any advice?
>>>>>>
>>>>>>
>>>>>> Lucy
>>>>>>
>>>>>> On Wed, May 13, 2015 at 12:22 PM, Lucy Chen <
>>>>>> lucychen2014fall@gmail.com> wrote:
>>>>>>
>>>>>>> Hi all,
>>>>>>>
>>>>>>>          I had a join step in my crunch pipeline, and it looks like
>>>>>>> the following:
>>>>>>>
>>>>>>> //get label data
>>>>>>>
>>>>>>> PType<Labels> LabelsType = Avros.records(Labels.class);
>>>>>>>
>>>>>>> PCollection<Labels> training_labels = input.parallelDo(new
>>>>>>> LabelDataParser(), LabelsType);
>>>>>>>
>>>>>>> PTable<String, Labels> labels_data = training_labels.
>>>>>>>
>>>>>>>  parallelDo(new KeyOnLabels("sample_ID"), tableOf(strings(),
>>>>>>> LabelsType));
>>>>>>>
>>>>>>> //get features
>>>>>>>
>>>>>>> PType<Feats> FeatsType = Avros.records(Feats.class);
>>>>>>>
>>>>>>> PCollection<Feats> training_feats = Feature.FeatLoader(pipeline,
>>>>>>> sample_features_inputs);
>>>>>>>
>>>>>>> PTable<String, Feats> feats_data = training_feats.parallelDo(new
>>>>>>> KeyOnFeats("sample_ID"), tableOf(strings(), FeatsType));
>>>>>>>
>>>>>>>
>>>>>>> //join labels and features
>>>>>>>
>>>>>>> JoinStrategy<String, Labels, Feats> strategy = new
>>>>>>> DefaultJoinStrategy<String, Labels, Feats>(20);
>>>>>>>
>>>>>>> PTable<String, Pair<Labels, Feats>> joined_training = strategy.
>>>>>>>
>>>>>>> join(labels_data, feats_data, JoinType.INNER_JOIN);
>>>>>>>
>>>>>>>
>>>>>>> //class Labels
>>>>>>>
>>>>>>> public class Labels implements java.io.Serializable, Cloneable{
>>>>>>>
>>>>>>>  private String class_ID;
>>>>>>>
>>>>>>> private String sample_ID;
>>>>>>>
>>>>>>> private int binary_ind;
>>>>>>>
>>>>>>>  public Labels()
>>>>>>>
>>>>>>> {
>>>>>>>
>>>>>>> this(null, null, 0);
>>>>>>>
>>>>>>> }
>>>>>>>
>>>>>>>          public Labels(String class_ID, String sample_ID, int ind)
>>>>>>>
>>>>>>> {
>>>>>>>
>>>>>>> this.class_ID = class_ID;
>>>>>>>
>>>>>>> this.sample_ID = sample_ID;
>>>>>>>
>>>>>>> this.binary_ind = ind;
>>>>>>>
>>>>>>> }
>>>>>>>
>>>>>>>         ...
>>>>>>>
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>> //class Feats
>>>>>>>
>>>>>>>
>>>>>>> public class *Feats* implements java.io.Serializable, Cloneable{
>>>>>>>
>>>>>>>  private String sample_id;
>>>>>>>
>>>>>>> private String sample_name;
>>>>>>>
>>>>>>> private Map<String, Float> feat;
>>>>>>>
>>>>>>>  public Feats()
>>>>>>>
>>>>>>> {
>>>>>>>
>>>>>>> this(null, null, null);
>>>>>>>
>>>>>>> }
>>>>>>>
>>>>>>>  public Feats(String id, String name, Map<String, Float> feat)
>>>>>>>
>>>>>>> {
>>>>>>>
>>>>>>> this.sample_id = id;
>>>>>>>
>>>>>>> this.sample_name = name;
>>>>>>>
>>>>>>> this.feat = feat;
>>>>>>>
>>>>>>>  }
>>>>>>>
>>>>>>>        ...
>>>>>>>
>>>>>>>
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>>    The outputs of labels_data and feats_data are both fine; but the
>>>>>>> join step throws the following exception:
>>>>>>>
>>>>>>>
>>>>>>> Error: java.lang.ClassCastException: java.lang.String cannot be cast
>>>>>>> to org.apache.crunch.Pair at
>>>>>>> org.apache.crunch.lib.join.DefaultJoinStrategy$1.map(DefaultJoinStrategy.java:87)
>>>>>>> at org.apache.crunch.MapFn.process(MapFn.java:34) at
>>>>>>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at
>>>>>>> org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)
>>>>>>> at org.apache.crunch.MapFn.process(MapFn.java:34) at
>>>>>>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at
>>>>>>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:109) at
>>>>>>> org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60) at
>>>>>>> org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) at
>>>>>>> org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763) at
>>>>>>> org.apache.hadoop.mapred.MapTask.run(MapTask.java:339) at
>>>>>>> org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162) at
>>>>>>> java.security.AccessController.doPrivileged(Native Method) at
>>>>>>> javax.security.auth.Subject.doAs(Subject.java:415) at
>>>>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
>>>>>>> at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)
>>>>>>>
>>>>>>>
>>>>>>>        This issue already bothered me for a while; Did any one get
>>>>>>> a similar issue here? Is there another option that will solve it?
>>>>>>>
>>>>>>>
>>>>>>>       Btw, I already successfully ran the following joining job:
>>>>>>>
>>>>>>>
>>>>>>> JoinStrategy<String, Float, Tuple3<String, String, Float>> strategy
>>>>>>> = new DefaultJoinStrategy<String, Float, Tuple3<String, String,
>>>>>>> Float>>(100);
>>>>>>>
>>>>>>>  PTable<String, Pair<Float,Tuple3<String, String, Float>>> joined =
>>>>>>> strategy.join(input_A, input_B, JoinType.INNER_JOIN);
>>>>>>>
>>>>>>>
>>>>>>>    So I guess the issue may be still related to the Avro types that
>>>>>>> I defined.
>>>>>>>
>>>>>>>
>>>>>>>         Thanks for your advice.
>>>>>>>
>>>>>>>
>>>>>>> Lucy
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>>
>> --
>> Director of Data Science
>> Cloudera <http://www.cloudera.com>
>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>
>
>

Re: question about join

Posted by Lucy Chen <lu...@gmail.com>.
Hi Josh,

         I am using crunch-core-0.11.0-hadoop2.jar; I did not find a class
AvroReflectIT under org.apache.crunch.io.avro. Is it in a new version?
Meanwhile how should I set the variable tmpDir in my code? What do you mean
by "done inside of AvroReflectIT"? You mean copy paste the above code
within the class AvroReflectIT and run it?

         Thanks!

Lucy

On Wed, May 13, 2015 at 6:09 PM, Josh Wills <jw...@cloudera.com> wrote:

> Hey Lucy,
>
> I tried to see if I could replicate the error by writing the following
> integration test (done inside of AvroReflectIT)-- does something like this
> fail when you run it? And you're using Crunch 0.11-hadoop2, or something
> earlier?
>
> @Test
> public void testJoinReflectedData() throws Exception {
>   Pipeline pipeline = new MRPipeline(AvroReflectIT.class, tmpDir.getDefaultConfiguration());
>   AvroType<StringWrapper> atype = Avros.records(StringWrapper.class);
>   PCollection<StringWrapper> p1 = pipeline.create(ImmutableList.of(
>     new StringWrapper("josh"), new StringWrapper("wills"), new StringWrapper("foo")), atype);
>   PCollection<StringWrapper> p2 = pipeline.create(ImmutableList.of(
>           new StringWrapper("mike"), new StringWrapper("wills"), new StringWrapper("foo")), atype);
>   PTable<String, StringWrapper> pt1 = p1.parallelDo(new MapFn<StringWrapper, Pair<String, StringWrapper>>() {
>     @Override
>     public Pair<String, StringWrapper> map(StringWrapper input) {
>       return Pair.of(input.getValue(), input);
>     }
>   }, Avros.tableOf(Avros.strings(), atype));
>   PTable<String, StringWrapper> pt2 = p2.parallelDo(new MapFn<StringWrapper, Pair<String, StringWrapper>>() {
>     @Override
>     public Pair<String, StringWrapper> map(StringWrapper input) {
>       return Pair.of(input.getValue(), input);
>     }
>   }, Avros.tableOf(Avros.strings(), atype));
>   JoinStrategy<String, StringWrapper, StringWrapper> joinStrategy = new DefaultJoinStrategy<String, StringWrapper, StringWrapper>();
>   PTable<String, Pair<StringWrapper, StringWrapper>> res = joinStrategy.join(pt1, pt2, JoinType.INNER_JOIN);
>   for (Pair<String, Pair<StringWrapper, StringWrapper>> p : res.materialize()) {
>     System.out.println(p);
>   }
> }
>
>
> On Wed, May 13, 2015 at 3:10 PM, Lucy Chen <lu...@gmail.com>
> wrote:
>
>> Hi,
>>
>>       I tried to dump the members from Label class to a TupleN, and dump
>> only sample_id and sample_name from Feats class to a Pair, and then do the
>> joining,
>>
>> JoinStrategy<String, TupleN, Pair<String, String>> strategy
>>
>> = new DefaultJoinStrategy<String, TupleN, Pair<String, String>>(20);
>>
>> PTable<String, Pair<TupleN, Pair<String, String>>> joined_training =
>> strategy.
>>
>> join(labels_data, feats_data, JoinType.INNER_JOIN);
>>
>>
>> The data dump step is good; I already verified the outputs.
>>
>> And still get the same exception from the joining. Looks like that it is
>> definitely not the issue of the Avro type that I defined. Now just got
>> stuck here...
>>
>>
>> Error: java.lang.ClassCastException: java.lang.String cannot be cast to
>> org.apache.crunch.Pair at
>> org.apache.crunch.lib.join.DefaultJoinStrategy$2.map(DefaultJoinStrategy.java:94)
>> at org.apache.crunch.MapFn.process(MapFn.java:34) at
>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at
>> org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)
>> at org.apache.crunch.MapFn.process(MapFn.java:34) at
>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at
>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:109) at
>> org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60) at
>> org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) at
>> org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763) at
>> org.apache.hadoop.mapred.MapTask.run(MapTask.java:339) at
>> org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162) at
>> java.security.AccessController.doPrivileged(Native Method) at
>> javax.security.auth.Subject.doAs(Subject.java:415) at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
>> at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)
>>
>> On Wed, May 13, 2015 at 2:05 PM, Lucy Chen <lu...@gmail.com>
>> wrote:
>>
>>> Hi Josh,
>>>
>>>          Those two functions bring the keys for each PCollections so
>>> that they can join in the next step. Here is the functions look like:
>>>
>>> public class KeyOnLabels extends DoFn<Labels, Pair<String, Labels>>{
>>>
>>>  private String key;
>>>
>>>  public KeyOnLabels(String input)
>>>
>>> {
>>>
>>> this.key = input;
>>>
>>> }
>>>
>>>  @Override
>>>
>>> public void process(Labels input, Emitter<Pair<String, Labels>> emitter)
>>>
>>> {
>>>
>>> if(key.equalsIgnoreCase("class_ID"))
>>>
>>>    emitter.emit(Pair.of(input.getClassID(), input));
>>>
>>> else if(key.equalsIgnoreCase("sample_ID"))
>>>
>>> emitter.emit(Pair.of(input.getSampleID(), input));
>>>
>>> else
>>>
>>> emitter.emit(Pair.of(Integer.toString(input.getBinaryIndicator()),
>>> input));
>>>
>>>  }
>>>
>>>
>>> }
>>>
>>> public class KeyOnFeats extends DoFn<Feats, Pair<String, Feats>>{
>>>
>>>  private final static Logger logger = Logger
>>>
>>>       .getLogger(KeyOnFeats.class.getName());
>>>
>>> private String key;
>>>
>>> public KeyOnFeats(String input)
>>>
>>> {
>>>
>>> this.key = input;
>>>
>>> }
>>>
>>> @Override
>>>
>>> public void process(Feats input, Emitter<Pair<String, Feats>> emitter)
>>>
>>> {
>>>
>>> if(key.equals("sample_ID"))
>>>
>>> emitter.emit(Pair.of(input.getSampleID(), input));
>>>
>>> else
>>>
>>> logger.error("The key should be specified as sample_ID only!");
>>>
>>> }
>>>
>>>
>>> }
>>>
>>>
>>> Meanwhile I checked the two outputs and they looked like
>>> normal.Something like the follows:
>>>
>>>
>>> {"key": "ID1", "value": Feats@14f5e86}
>>>
>>>
>>> {"key": "ID1", "value": Labels@489983f3}
>>>
>>>
>>>     That's why I feel something weird happened in the join step.
>>>
>>>
>>>     Thanks!
>>>
>>>
>>> Lucy
>>>
>>> On Wed, May 13, 2015 at 12:34 PM, Josh Wills <jo...@gmail.com>
>>> wrote:
>>>
>>>> What do the KeyOnLabels and KeyOnFeats functions return? From the
>>>> error, it looks like the labels_data PTable is actually a
>>>> PCollection<String>, which would happen if KeyOnLabels was somehow
>>>> returning a String in some situations despite claiming to return a
>>>> Pair<String, Labels>.
>>>>
>>>> On Wed, May 13, 2015 at 12:31 PM, Lucy Chen <lucychen2014fall@gmail.com
>>>> > wrote:
>>>>
>>>>> Meanwhile, I just realized that one of my joining jobs look also OK,
>>>>> which also had the Avro type in it:
>>>>>
>>>>> JoinStrategy<String, Feats, Feats> strategy
>>>>>
>>>>> = new DefaultJoinStrategy<String, Feats, Feats>(50);
>>>>>
>>>>> PTable<String, Pair<Feats,Feats>> joined = strategy.join(input_A,
>>>>> input_B, JoinType.INNER_JOIN);
>>>>>
>>>>>
>>>>> So the Avro type probably is not the issue.
>>>>>
>>>>>
>>>>> Any advice?
>>>>>
>>>>>
>>>>> Lucy
>>>>>
>>>>> On Wed, May 13, 2015 at 12:22 PM, Lucy Chen <
>>>>> lucychen2014fall@gmail.com> wrote:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>>          I had a join step in my crunch pipeline, and it looks like
>>>>>> the following:
>>>>>>
>>>>>> //get label data
>>>>>>
>>>>>> PType<Labels> LabelsType = Avros.records(Labels.class);
>>>>>>
>>>>>> PCollection<Labels> training_labels = input.parallelDo(new
>>>>>> LabelDataParser(), LabelsType);
>>>>>>
>>>>>> PTable<String, Labels> labels_data = training_labels.
>>>>>>
>>>>>>  parallelDo(new KeyOnLabels("sample_ID"), tableOf(strings(),
>>>>>> LabelsType));
>>>>>>
>>>>>> //get features
>>>>>>
>>>>>> PType<Feats> FeatsType = Avros.records(Feats.class);
>>>>>>
>>>>>> PCollection<Feats> training_feats = Feature.FeatLoader(pipeline,
>>>>>> sample_features_inputs);
>>>>>>
>>>>>> PTable<String, Feats> feats_data = training_feats.parallelDo(new
>>>>>> KeyOnFeats("sample_ID"), tableOf(strings(), FeatsType));
>>>>>>
>>>>>>
>>>>>> //join labels and features
>>>>>>
>>>>>> JoinStrategy<String, Labels, Feats> strategy = new
>>>>>> DefaultJoinStrategy<String, Labels, Feats>(20);
>>>>>>
>>>>>> PTable<String, Pair<Labels, Feats>> joined_training = strategy.
>>>>>>
>>>>>> join(labels_data, feats_data, JoinType.INNER_JOIN);
>>>>>>
>>>>>>
>>>>>> //class Labels
>>>>>>
>>>>>> public class Labels implements java.io.Serializable, Cloneable{
>>>>>>
>>>>>>  private String class_ID;
>>>>>>
>>>>>> private String sample_ID;
>>>>>>
>>>>>> private int binary_ind;
>>>>>>
>>>>>>  public Labels()
>>>>>>
>>>>>> {
>>>>>>
>>>>>> this(null, null, 0);
>>>>>>
>>>>>> }
>>>>>>
>>>>>>          public Labels(String class_ID, String sample_ID, int ind)
>>>>>>
>>>>>> {
>>>>>>
>>>>>> this.class_ID = class_ID;
>>>>>>
>>>>>> this.sample_ID = sample_ID;
>>>>>>
>>>>>> this.binary_ind = ind;
>>>>>>
>>>>>> }
>>>>>>
>>>>>>         ...
>>>>>>
>>>>>> }
>>>>>>
>>>>>>
>>>>>> //class Feats
>>>>>>
>>>>>>
>>>>>> public class *Feats* implements java.io.Serializable, Cloneable{
>>>>>>
>>>>>>  private String sample_id;
>>>>>>
>>>>>> private String sample_name;
>>>>>>
>>>>>> private Map<String, Float> feat;
>>>>>>
>>>>>>  public Feats()
>>>>>>
>>>>>> {
>>>>>>
>>>>>> this(null, null, null);
>>>>>>
>>>>>> }
>>>>>>
>>>>>>  public Feats(String id, String name, Map<String, Float> feat)
>>>>>>
>>>>>> {
>>>>>>
>>>>>> this.sample_id = id;
>>>>>>
>>>>>> this.sample_name = name;
>>>>>>
>>>>>> this.feat = feat;
>>>>>>
>>>>>>  }
>>>>>>
>>>>>>        ...
>>>>>>
>>>>>>
>>>>>> }
>>>>>>
>>>>>>
>>>>>>    The outputs of labels_data and feats_data are both fine; but the
>>>>>> join step throws the following exception:
>>>>>>
>>>>>>
>>>>>> Error: java.lang.ClassCastException: java.lang.String cannot be cast
>>>>>> to org.apache.crunch.Pair at
>>>>>> org.apache.crunch.lib.join.DefaultJoinStrategy$1.map(DefaultJoinStrategy.java:87)
>>>>>> at org.apache.crunch.MapFn.process(MapFn.java:34) at
>>>>>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at
>>>>>> org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)
>>>>>> at org.apache.crunch.MapFn.process(MapFn.java:34) at
>>>>>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at
>>>>>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:109) at
>>>>>> org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60) at
>>>>>> org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) at
>>>>>> org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763) at
>>>>>> org.apache.hadoop.mapred.MapTask.run(MapTask.java:339) at
>>>>>> org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162) at
>>>>>> java.security.AccessController.doPrivileged(Native Method) at
>>>>>> javax.security.auth.Subject.doAs(Subject.java:415) at
>>>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
>>>>>> at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)
>>>>>>
>>>>>>
>>>>>>        This issue already bothered me for a while; Did any one get a
>>>>>> similar issue here? Is there another option that will solve it?
>>>>>>
>>>>>>
>>>>>>       Btw, I already successfully ran the following joining job:
>>>>>>
>>>>>>
>>>>>> JoinStrategy<String, Float, Tuple3<String, String, Float>> strategy =
>>>>>> new DefaultJoinStrategy<String, Float, Tuple3<String, String,
>>>>>> Float>>(100);
>>>>>>
>>>>>>  PTable<String, Pair<Float,Tuple3<String, String, Float>>> joined =
>>>>>> strategy.join(input_A, input_B, JoinType.INNER_JOIN);
>>>>>>
>>>>>>
>>>>>>    So I guess the issue may be still related to the Avro types that
>>>>>> I defined.
>>>>>>
>>>>>>
>>>>>>         Thanks for your advice.
>>>>>>
>>>>>>
>>>>>> Lucy
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>
>
> --
> Director of Data Science
> Cloudera <http://www.cloudera.com>
> Twitter: @josh_wills <http://twitter.com/josh_wills>
>

Re: question about join

Posted by Josh Wills <jw...@cloudera.com>.
Hey Lucy,

I tried to see if I could replicate the error by writing the following
integration test (done inside of AvroReflectIT)-- does something like this
fail when you run it? And you're using Crunch 0.11-hadoop2, or something
earlier?

@Test
public void testJoinReflectedData() throws Exception {
  Pipeline pipeline = new MRPipeline(AvroReflectIT.class,
tmpDir.getDefaultConfiguration());
  AvroType<StringWrapper> atype = Avros.records(StringWrapper.class);
  PCollection<StringWrapper> p1 = pipeline.create(ImmutableList.of(
    new StringWrapper("josh"), new StringWrapper("wills"), new
StringWrapper("foo")), atype);
  PCollection<StringWrapper> p2 = pipeline.create(ImmutableList.of(
          new StringWrapper("mike"), new StringWrapper("wills"), new
StringWrapper("foo")), atype);
  PTable<String, StringWrapper> pt1 = p1.parallelDo(new
MapFn<StringWrapper, Pair<String, StringWrapper>>() {
    @Override
    public Pair<String, StringWrapper> map(StringWrapper input) {
      return Pair.of(input.getValue(), input);
    }
  }, Avros.tableOf(Avros.strings(), atype));
  PTable<String, StringWrapper> pt2 = p2.parallelDo(new
MapFn<StringWrapper, Pair<String, StringWrapper>>() {
    @Override
    public Pair<String, StringWrapper> map(StringWrapper input) {
      return Pair.of(input.getValue(), input);
    }
  }, Avros.tableOf(Avros.strings(), atype));
  JoinStrategy<String, StringWrapper, StringWrapper> joinStrategy =
new DefaultJoinStrategy<String, StringWrapper, StringWrapper>();
  PTable<String, Pair<StringWrapper, StringWrapper>> res =
joinStrategy.join(pt1, pt2, JoinType.INNER_JOIN);
  for (Pair<String, Pair<StringWrapper, StringWrapper>> p : res.materialize()) {
    System.out.println(p);
  }
}


On Wed, May 13, 2015 at 3:10 PM, Lucy Chen <lu...@gmail.com>
wrote:

> Hi,
>
>       I tried to dump the members from Label class to a TupleN, and dump
> only sample_id and sample_name from Feats class to a Pair, and then do the
> joining,
>
> JoinStrategy<String, TupleN, Pair<String, String>> strategy
>
> = new DefaultJoinStrategy<String, TupleN, Pair<String, String>>(20);
>
> PTable<String, Pair<TupleN, Pair<String, String>>> joined_training =
> strategy.
>
> join(labels_data, feats_data, JoinType.INNER_JOIN);
>
>
> The data dump step is good; I already verified the outputs.
>
> And still get the same exception from the joining. Looks like that it is
> definitely not the issue of the Avro type that I defined. Now just got
> stuck here...
>
>
> Error: java.lang.ClassCastException: java.lang.String cannot be cast to
> org.apache.crunch.Pair at
> org.apache.crunch.lib.join.DefaultJoinStrategy$2.map(DefaultJoinStrategy.java:94)
> at org.apache.crunch.MapFn.process(MapFn.java:34) at
> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at
> org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)
> at org.apache.crunch.MapFn.process(MapFn.java:34) at
> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at
> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:109) at
> org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60) at
> org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) at
> org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763) at
> org.apache.hadoop.mapred.MapTask.run(MapTask.java:339) at
> org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162) at
> java.security.AccessController.doPrivileged(Native Method) at
> javax.security.auth.Subject.doAs(Subject.java:415) at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
> at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)
>
> On Wed, May 13, 2015 at 2:05 PM, Lucy Chen <lu...@gmail.com>
> wrote:
>
>> Hi Josh,
>>
>>          Those two functions bring the keys for each PCollections so that
>> they can join in the next step. Here is the functions look like:
>>
>> public class KeyOnLabels extends DoFn<Labels, Pair<String, Labels>>{
>>
>>  private String key;
>>
>>  public KeyOnLabels(String input)
>>
>> {
>>
>> this.key = input;
>>
>> }
>>
>>  @Override
>>
>> public void process(Labels input, Emitter<Pair<String, Labels>> emitter)
>>
>> {
>>
>> if(key.equalsIgnoreCase("class_ID"))
>>
>>    emitter.emit(Pair.of(input.getClassID(), input));
>>
>> else if(key.equalsIgnoreCase("sample_ID"))
>>
>> emitter.emit(Pair.of(input.getSampleID(), input));
>>
>> else
>>
>> emitter.emit(Pair.of(Integer.toString(input.getBinaryIndicator()),
>> input));
>>
>>  }
>>
>>
>> }
>>
>> public class KeyOnFeats extends DoFn<Feats, Pair<String, Feats>>{
>>
>>  private final static Logger logger = Logger
>>
>>       .getLogger(KeyOnFeats.class.getName());
>>
>> private String key;
>>
>> public KeyOnFeats(String input)
>>
>> {
>>
>> this.key = input;
>>
>> }
>>
>> @Override
>>
>> public void process(Feats input, Emitter<Pair<String, Feats>> emitter)
>>
>> {
>>
>> if(key.equals("sample_ID"))
>>
>> emitter.emit(Pair.of(input.getSampleID(), input));
>>
>> else
>>
>> logger.error("The key should be specified as sample_ID only!");
>>
>> }
>>
>>
>> }
>>
>>
>> Meanwhile I checked the two outputs and they looked like normal.Something
>> like the follows:
>>
>>
>> {"key": "ID1", "value": Feats@14f5e86}
>>
>>
>> {"key": "ID1", "value": Labels@489983f3}
>>
>>
>>     That's why I feel something weird happened in the join step.
>>
>>
>>     Thanks!
>>
>>
>> Lucy
>>
>> On Wed, May 13, 2015 at 12:34 PM, Josh Wills <jo...@gmail.com>
>> wrote:
>>
>>> What do the KeyOnLabels and KeyOnFeats functions return? From the error,
>>> it looks like the labels_data PTable is actually a PCollection<String>,
>>> which would happen if KeyOnLabels was somehow returning a String in some
>>> situations despite claiming to return a Pair<String, Labels>.
>>>
>>> On Wed, May 13, 2015 at 12:31 PM, Lucy Chen <lu...@gmail.com>
>>> wrote:
>>>
>>>> Meanwhile, I just realized that one of my joining jobs look also OK,
>>>> which also had the Avro type in it:
>>>>
>>>> JoinStrategy<String, Feats, Feats> strategy
>>>>
>>>> = new DefaultJoinStrategy<String, Feats, Feats>(50);
>>>>
>>>> PTable<String, Pair<Feats,Feats>> joined = strategy.join(input_A,
>>>> input_B, JoinType.INNER_JOIN);
>>>>
>>>>
>>>> So the Avro type probably is not the issue.
>>>>
>>>>
>>>> Any advice?
>>>>
>>>>
>>>> Lucy
>>>>
>>>> On Wed, May 13, 2015 at 12:22 PM, Lucy Chen <lucychen2014fall@gmail.com
>>>> > wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>>          I had a join step in my crunch pipeline, and it looks like
>>>>> the following:
>>>>>
>>>>> //get label data
>>>>>
>>>>> PType<Labels> LabelsType = Avros.records(Labels.class);
>>>>>
>>>>> PCollection<Labels> training_labels = input.parallelDo(new
>>>>> LabelDataParser(), LabelsType);
>>>>>
>>>>> PTable<String, Labels> labels_data = training_labels.
>>>>>
>>>>>  parallelDo(new KeyOnLabels("sample_ID"), tableOf(strings(),
>>>>> LabelsType));
>>>>>
>>>>> //get features
>>>>>
>>>>> PType<Feats> FeatsType = Avros.records(Feats.class);
>>>>>
>>>>> PCollection<Feats> training_feats = Feature.FeatLoader(pipeline,
>>>>> sample_features_inputs);
>>>>>
>>>>> PTable<String, Feats> feats_data = training_feats.parallelDo(new
>>>>> KeyOnFeats("sample_ID"), tableOf(strings(), FeatsType));
>>>>>
>>>>>
>>>>> //join labels and features
>>>>>
>>>>> JoinStrategy<String, Labels, Feats> strategy = new
>>>>> DefaultJoinStrategy<String, Labels, Feats>(20);
>>>>>
>>>>> PTable<String, Pair<Labels, Feats>> joined_training = strategy.
>>>>>
>>>>> join(labels_data, feats_data, JoinType.INNER_JOIN);
>>>>>
>>>>>
>>>>> //class Labels
>>>>>
>>>>> public class Labels implements java.io.Serializable, Cloneable{
>>>>>
>>>>>  private String class_ID;
>>>>>
>>>>> private String sample_ID;
>>>>>
>>>>> private int binary_ind;
>>>>>
>>>>>  public Labels()
>>>>>
>>>>> {
>>>>>
>>>>> this(null, null, 0);
>>>>>
>>>>> }
>>>>>
>>>>>          public Labels(String class_ID, String sample_ID, int ind)
>>>>>
>>>>> {
>>>>>
>>>>> this.class_ID = class_ID;
>>>>>
>>>>> this.sample_ID = sample_ID;
>>>>>
>>>>> this.binary_ind = ind;
>>>>>
>>>>> }
>>>>>
>>>>>         ...
>>>>>
>>>>> }
>>>>>
>>>>>
>>>>> //class Feats
>>>>>
>>>>>
>>>>> public class *Feats* implements java.io.Serializable, Cloneable{
>>>>>
>>>>>  private String sample_id;
>>>>>
>>>>> private String sample_name;
>>>>>
>>>>> private Map<String, Float> feat;
>>>>>
>>>>>  public Feats()
>>>>>
>>>>> {
>>>>>
>>>>> this(null, null, null);
>>>>>
>>>>> }
>>>>>
>>>>>  public Feats(String id, String name, Map<String, Float> feat)
>>>>>
>>>>> {
>>>>>
>>>>> this.sample_id = id;
>>>>>
>>>>> this.sample_name = name;
>>>>>
>>>>> this.feat = feat;
>>>>>
>>>>>  }
>>>>>
>>>>>        ...
>>>>>
>>>>>
>>>>> }
>>>>>
>>>>>
>>>>>    The outputs of labels_data and feats_data are both fine; but the
>>>>> join step throws the following exception:
>>>>>
>>>>>
>>>>> Error: java.lang.ClassCastException: java.lang.String cannot be cast
>>>>> to org.apache.crunch.Pair at
>>>>> org.apache.crunch.lib.join.DefaultJoinStrategy$1.map(DefaultJoinStrategy.java:87)
>>>>> at org.apache.crunch.MapFn.process(MapFn.java:34) at
>>>>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at
>>>>> org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)
>>>>> at org.apache.crunch.MapFn.process(MapFn.java:34) at
>>>>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at
>>>>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:109) at
>>>>> org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60) at
>>>>> org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) at
>>>>> org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763) at
>>>>> org.apache.hadoop.mapred.MapTask.run(MapTask.java:339) at
>>>>> org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162) at
>>>>> java.security.AccessController.doPrivileged(Native Method) at
>>>>> javax.security.auth.Subject.doAs(Subject.java:415) at
>>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
>>>>> at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)
>>>>>
>>>>>
>>>>>        This issue already bothered me for a while; Did any one get a
>>>>> similar issue here? Is there another option that will solve it?
>>>>>
>>>>>
>>>>>       Btw, I already successfully ran the following joining job:
>>>>>
>>>>>
>>>>> JoinStrategy<String, Float, Tuple3<String, String, Float>> strategy =
>>>>> new DefaultJoinStrategy<String, Float, Tuple3<String, String,
>>>>> Float>>(100);
>>>>>
>>>>>  PTable<String, Pair<Float,Tuple3<String, String, Float>>> joined =
>>>>> strategy.join(input_A, input_B, JoinType.INNER_JOIN);
>>>>>
>>>>>
>>>>>    So I guess the issue may be still related to the Avro types that I
>>>>> defined.
>>>>>
>>>>>
>>>>>         Thanks for your advice.
>>>>>
>>>>>
>>>>> Lucy
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>


-- 
Director of Data Science
Cloudera <http://www.cloudera.com>
Twitter: @josh_wills <http://twitter.com/josh_wills>

Re: question about join

Posted by Lucy Chen <lu...@gmail.com>.
Hi,

      I tried to dump the members from Label class to a TupleN, and dump
only sample_id and sample_name from Feats class to a Pair, and then do the
joining,

JoinStrategy<String, TupleN, Pair<String, String>> strategy

= new DefaultJoinStrategy<String, TupleN, Pair<String, String>>(20);

PTable<String, Pair<TupleN, Pair<String, String>>> joined_training =
strategy.

join(labels_data, feats_data, JoinType.INNER_JOIN);


The data dump step is good; I already verified the outputs.

And still get the same exception from the joining. Looks like that it is
definitely not the issue of the Avro type that I defined. Now just got
stuck here...


Error: java.lang.ClassCastException: java.lang.String cannot be cast to
org.apache.crunch.Pair at
org.apache.crunch.lib.join.DefaultJoinStrategy$2.map(DefaultJoinStrategy.java:94)
at org.apache.crunch.MapFn.process(MapFn.java:34) at
org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at
org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)
at org.apache.crunch.MapFn.process(MapFn.java:34) at
org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at
org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:109) at
org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60) at
org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) at
org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763) at
org.apache.hadoop.mapred.MapTask.run(MapTask.java:339) at
org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162) at
java.security.AccessController.doPrivileged(Native Method) at
javax.security.auth.Subject.doAs(Subject.java:415) at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)

On Wed, May 13, 2015 at 2:05 PM, Lucy Chen <lu...@gmail.com>
wrote:

> Hi Josh,
>
>          Those two functions bring the keys for each PCollections so that
> they can join in the next step. Here is the functions look like:
>
> public class KeyOnLabels extends DoFn<Labels, Pair<String, Labels>>{
>
>  private String key;
>
>  public KeyOnLabels(String input)
>
> {
>
> this.key = input;
>
> }
>
>  @Override
>
> public void process(Labels input, Emitter<Pair<String, Labels>> emitter)
>
> {
>
> if(key.equalsIgnoreCase("class_ID"))
>
>    emitter.emit(Pair.of(input.getClassID(), input));
>
> else if(key.equalsIgnoreCase("sample_ID"))
>
> emitter.emit(Pair.of(input.getSampleID(), input));
>
> else
>
> emitter.emit(Pair.of(Integer.toString(input.getBinaryIndicator()), input));
>
>  }
>
>
> }
>
> public class KeyOnFeats extends DoFn<Feats, Pair<String, Feats>>{
>
>  private final static Logger logger = Logger
>
>       .getLogger(KeyOnFeats.class.getName());
>
> private String key;
>
> public KeyOnFeats(String input)
>
> {
>
> this.key = input;
>
> }
>
> @Override
>
> public void process(Feats input, Emitter<Pair<String, Feats>> emitter)
>
> {
>
> if(key.equals("sample_ID"))
>
> emitter.emit(Pair.of(input.getSampleID(), input));
>
> else
>
> logger.error("The key should be specified as sample_ID only!");
>
> }
>
>
> }
>
>
> Meanwhile I checked the two outputs and they looked like normal.Something
> like the follows:
>
>
> {"key": "ID1", "value": Feats@14f5e86}
>
>
> {"key": "ID1", "value": Labels@489983f3}
>
>
>     That's why I feel something weird happened in the join step.
>
>
>     Thanks!
>
>
> Lucy
>
> On Wed, May 13, 2015 at 12:34 PM, Josh Wills <jo...@gmail.com> wrote:
>
>> What do the KeyOnLabels and KeyOnFeats functions return? From the error,
>> it looks like the labels_data PTable is actually a PCollection<String>,
>> which would happen if KeyOnLabels was somehow returning a String in some
>> situations despite claiming to return a Pair<String, Labels>.
>>
>> On Wed, May 13, 2015 at 12:31 PM, Lucy Chen <lu...@gmail.com>
>> wrote:
>>
>>> Meanwhile, I just realized that one of my joining jobs look also OK,
>>> which also had the Avro type in it:
>>>
>>> JoinStrategy<String, Feats, Feats> strategy
>>>
>>> = new DefaultJoinStrategy<String, Feats, Feats>(50);
>>>
>>> PTable<String, Pair<Feats,Feats>> joined = strategy.join(input_A,
>>> input_B, JoinType.INNER_JOIN);
>>>
>>>
>>> So the Avro type probably is not the issue.
>>>
>>>
>>> Any advice?
>>>
>>>
>>> Lucy
>>>
>>> On Wed, May 13, 2015 at 12:22 PM, Lucy Chen <lu...@gmail.com>
>>> wrote:
>>>
>>>> Hi all,
>>>>
>>>>          I had a join step in my crunch pipeline, and it looks like the
>>>> following:
>>>>
>>>> //get label data
>>>>
>>>> PType<Labels> LabelsType = Avros.records(Labels.class);
>>>>
>>>> PCollection<Labels> training_labels = input.parallelDo(new
>>>> LabelDataParser(), LabelsType);
>>>>
>>>> PTable<String, Labels> labels_data = training_labels.
>>>>
>>>>  parallelDo(new KeyOnLabels("sample_ID"), tableOf(strings(),
>>>> LabelsType));
>>>>
>>>> //get features
>>>>
>>>> PType<Feats> FeatsType = Avros.records(Feats.class);
>>>>
>>>> PCollection<Feats> training_feats = Feature.FeatLoader(pipeline,
>>>> sample_features_inputs);
>>>>
>>>> PTable<String, Feats> feats_data = training_feats.parallelDo(new
>>>> KeyOnFeats("sample_ID"), tableOf(strings(), FeatsType));
>>>>
>>>>
>>>> //join labels and features
>>>>
>>>> JoinStrategy<String, Labels, Feats> strategy = new
>>>> DefaultJoinStrategy<String, Labels, Feats>(20);
>>>>
>>>> PTable<String, Pair<Labels, Feats>> joined_training = strategy.
>>>>
>>>> join(labels_data, feats_data, JoinType.INNER_JOIN);
>>>>
>>>>
>>>> //class Labels
>>>>
>>>> public class Labels implements java.io.Serializable, Cloneable{
>>>>
>>>>  private String class_ID;
>>>>
>>>> private String sample_ID;
>>>>
>>>> private int binary_ind;
>>>>
>>>>  public Labels()
>>>>
>>>> {
>>>>
>>>> this(null, null, 0);
>>>>
>>>> }
>>>>
>>>>          public Labels(String class_ID, String sample_ID, int ind)
>>>>
>>>> {
>>>>
>>>> this.class_ID = class_ID;
>>>>
>>>> this.sample_ID = sample_ID;
>>>>
>>>> this.binary_ind = ind;
>>>>
>>>> }
>>>>
>>>>         ...
>>>>
>>>> }
>>>>
>>>>
>>>> //class Feats
>>>>
>>>>
>>>> public class *Feats* implements java.io.Serializable, Cloneable{
>>>>
>>>>  private String sample_id;
>>>>
>>>> private String sample_name;
>>>>
>>>> private Map<String, Float> feat;
>>>>
>>>>  public Feats()
>>>>
>>>> {
>>>>
>>>> this(null, null, null);
>>>>
>>>> }
>>>>
>>>>  public Feats(String id, String name, Map<String, Float> feat)
>>>>
>>>> {
>>>>
>>>> this.sample_id = id;
>>>>
>>>> this.sample_name = name;
>>>>
>>>> this.feat = feat;
>>>>
>>>>  }
>>>>
>>>>        ...
>>>>
>>>>
>>>> }
>>>>
>>>>
>>>>    The outputs of labels_data and feats_data are both fine; but the
>>>> join step throws the following exception:
>>>>
>>>>
>>>> Error: java.lang.ClassCastException: java.lang.String cannot be cast to
>>>> org.apache.crunch.Pair at
>>>> org.apache.crunch.lib.join.DefaultJoinStrategy$1.map(DefaultJoinStrategy.java:87)
>>>> at org.apache.crunch.MapFn.process(MapFn.java:34) at
>>>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at
>>>> org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)
>>>> at org.apache.crunch.MapFn.process(MapFn.java:34) at
>>>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at
>>>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:109) at
>>>> org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60) at
>>>> org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) at
>>>> org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763) at
>>>> org.apache.hadoop.mapred.MapTask.run(MapTask.java:339) at
>>>> org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162) at
>>>> java.security.AccessController.doPrivileged(Native Method) at
>>>> javax.security.auth.Subject.doAs(Subject.java:415) at
>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
>>>> at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)
>>>>
>>>>
>>>>        This issue already bothered me for a while; Did any one get a
>>>> similar issue here? Is there another option that will solve it?
>>>>
>>>>
>>>>       Btw, I already successfully ran the following joining job:
>>>>
>>>>
>>>> JoinStrategy<String, Float, Tuple3<String, String, Float>> strategy =
>>>> new DefaultJoinStrategy<String, Float, Tuple3<String, String,
>>>> Float>>(100);
>>>>
>>>>  PTable<String, Pair<Float,Tuple3<String, String, Float>>> joined =
>>>> strategy.join(input_A, input_B, JoinType.INNER_JOIN);
>>>>
>>>>
>>>>    So I guess the issue may be still related to the Avro types that I
>>>> defined.
>>>>
>>>>
>>>>         Thanks for your advice.
>>>>
>>>>
>>>> Lucy
>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>>
>

Re: question about join

Posted by Lucy Chen <lu...@gmail.com>.
Hi Josh,

         Those two functions bring the keys for each PCollections so that
they can join in the next step. Here is the functions look like:

public class KeyOnLabels extends DoFn<Labels, Pair<String, Labels>>{

 private String key;

 public KeyOnLabels(String input)

{

this.key = input;

}

 @Override

public void process(Labels input, Emitter<Pair<String, Labels>> emitter)

{

if(key.equalsIgnoreCase("class_ID"))

   emitter.emit(Pair.of(input.getClassID(), input));

else if(key.equalsIgnoreCase("sample_ID"))

emitter.emit(Pair.of(input.getSampleID(), input));

else

emitter.emit(Pair.of(Integer.toString(input.getBinaryIndicator()), input));

 }


}

public class KeyOnFeats extends DoFn<Feats, Pair<String, Feats>>{

 private final static Logger logger = Logger

      .getLogger(KeyOnFeats.class.getName());

private String key;

public KeyOnFeats(String input)

{

this.key = input;

}

@Override

public void process(Feats input, Emitter<Pair<String, Feats>> emitter)

{

if(key.equals("sample_ID"))

emitter.emit(Pair.of(input.getSampleID(), input));

else

logger.error("The key should be specified as sample_ID only!");

}


}


Meanwhile I checked the two outputs and they looked like normal.Something
like the follows:


{"key": "ID1", "value": Feats@14f5e86}


{"key": "ID1", "value": Labels@489983f3}


    That's why I feel something weird happened in the join step.


    Thanks!


Lucy

On Wed, May 13, 2015 at 12:34 PM, Josh Wills <jo...@gmail.com> wrote:

> What do the KeyOnLabels and KeyOnFeats functions return? From the error,
> it looks like the labels_data PTable is actually a PCollection<String>,
> which would happen if KeyOnLabels was somehow returning a String in some
> situations despite claiming to return a Pair<String, Labels>.
>
> On Wed, May 13, 2015 at 12:31 PM, Lucy Chen <lu...@gmail.com>
> wrote:
>
>> Meanwhile, I just realized that one of my joining jobs look also OK,
>> which also had the Avro type in it:
>>
>> JoinStrategy<String, Feats, Feats> strategy
>>
>> = new DefaultJoinStrategy<String, Feats, Feats>(50);
>>
>> PTable<String, Pair<Feats,Feats>> joined = strategy.join(input_A,
>> input_B, JoinType.INNER_JOIN);
>>
>>
>> So the Avro type probably is not the issue.
>>
>>
>> Any advice?
>>
>>
>> Lucy
>>
>> On Wed, May 13, 2015 at 12:22 PM, Lucy Chen <lu...@gmail.com>
>> wrote:
>>
>>> Hi all,
>>>
>>>          I had a join step in my crunch pipeline, and it looks like the
>>> following:
>>>
>>> //get label data
>>>
>>> PType<Labels> LabelsType = Avros.records(Labels.class);
>>>
>>> PCollection<Labels> training_labels = input.parallelDo(new
>>> LabelDataParser(), LabelsType);
>>>
>>> PTable<String, Labels> labels_data = training_labels.
>>>
>>>  parallelDo(new KeyOnLabels("sample_ID"), tableOf(strings(),
>>> LabelsType));
>>>
>>> //get features
>>>
>>> PType<Feats> FeatsType = Avros.records(Feats.class);
>>>
>>> PCollection<Feats> training_feats = Feature.FeatLoader(pipeline,
>>> sample_features_inputs);
>>>
>>> PTable<String, Feats> feats_data = training_feats.parallelDo(new
>>> KeyOnFeats("sample_ID"), tableOf(strings(), FeatsType));
>>>
>>>
>>> //join labels and features
>>>
>>> JoinStrategy<String, Labels, Feats> strategy = new
>>> DefaultJoinStrategy<String, Labels, Feats>(20);
>>>
>>> PTable<String, Pair<Labels, Feats>> joined_training = strategy.
>>>
>>> join(labels_data, feats_data, JoinType.INNER_JOIN);
>>>
>>>
>>> //class Labels
>>>
>>> public class Labels implements java.io.Serializable, Cloneable{
>>>
>>>  private String class_ID;
>>>
>>> private String sample_ID;
>>>
>>> private int binary_ind;
>>>
>>>  public Labels()
>>>
>>> {
>>>
>>> this(null, null, 0);
>>>
>>> }
>>>
>>>          public Labels(String class_ID, String sample_ID, int ind)
>>>
>>> {
>>>
>>> this.class_ID = class_ID;
>>>
>>> this.sample_ID = sample_ID;
>>>
>>> this.binary_ind = ind;
>>>
>>> }
>>>
>>>         ...
>>>
>>> }
>>>
>>>
>>> //class Feats
>>>
>>>
>>> public class *Feats* implements java.io.Serializable, Cloneable{
>>>
>>>  private String sample_id;
>>>
>>> private String sample_name;
>>>
>>> private Map<String, Float> feat;
>>>
>>>  public Feats()
>>>
>>> {
>>>
>>> this(null, null, null);
>>>
>>> }
>>>
>>>  public Feats(String id, String name, Map<String, Float> feat)
>>>
>>> {
>>>
>>> this.sample_id = id;
>>>
>>> this.sample_name = name;
>>>
>>> this.feat = feat;
>>>
>>>  }
>>>
>>>        ...
>>>
>>>
>>> }
>>>
>>>
>>>    The outputs of labels_data and feats_data are both fine; but the
>>> join step throws the following exception:
>>>
>>>
>>> Error: java.lang.ClassCastException: java.lang.String cannot be cast to
>>> org.apache.crunch.Pair at
>>> org.apache.crunch.lib.join.DefaultJoinStrategy$1.map(DefaultJoinStrategy.java:87)
>>> at org.apache.crunch.MapFn.process(MapFn.java:34) at
>>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at
>>> org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)
>>> at org.apache.crunch.MapFn.process(MapFn.java:34) at
>>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at
>>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:109) at
>>> org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60) at
>>> org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) at
>>> org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763) at
>>> org.apache.hadoop.mapred.MapTask.run(MapTask.java:339) at
>>> org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162) at
>>> java.security.AccessController.doPrivileged(Native Method) at
>>> javax.security.auth.Subject.doAs(Subject.java:415) at
>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
>>> at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)
>>>
>>>
>>>        This issue already bothered me for a while; Did any one get a
>>> similar issue here? Is there another option that will solve it?
>>>
>>>
>>>       Btw, I already successfully ran the following joining job:
>>>
>>>
>>> JoinStrategy<String, Float, Tuple3<String, String, Float>> strategy =
>>> new DefaultJoinStrategy<String, Float, Tuple3<String, String,
>>> Float>>(100);
>>>
>>>  PTable<String, Pair<Float,Tuple3<String, String, Float>>> joined =
>>> strategy.join(input_A, input_B, JoinType.INNER_JOIN);
>>>
>>>
>>>    So I guess the issue may be still related to the Avro types that I
>>> defined.
>>>
>>>
>>>         Thanks for your advice.
>>>
>>>
>>> Lucy
>>>
>>>
>>>
>>>
>>>
>>
>

Re: question about join

Posted by Josh Wills <jo...@gmail.com>.
What do the KeyOnLabels and KeyOnFeats functions return? From the error, it
looks like the labels_data PTable is actually a PCollection<String>, which
would happen if KeyOnLabels was somehow returning a String in some
situations despite claiming to return a Pair<String, Labels>.

On Wed, May 13, 2015 at 12:31 PM, Lucy Chen <lu...@gmail.com>
wrote:

> Meanwhile, I just realized that one of my joining jobs look also OK, which
> also had the Avro type in it:
>
> JoinStrategy<String, Feats, Feats> strategy
>
> = new DefaultJoinStrategy<String, Feats, Feats>(50);
>
> PTable<String, Pair<Feats,Feats>> joined = strategy.join(input_A, input_B,
> JoinType.INNER_JOIN);
>
>
> So the Avro type probably is not the issue.
>
>
> Any advice?
>
>
> Lucy
>
> On Wed, May 13, 2015 at 12:22 PM, Lucy Chen <lu...@gmail.com>
> wrote:
>
>> Hi all,
>>
>>          I had a join step in my crunch pipeline, and it looks like the
>> following:
>>
>> //get label data
>>
>> PType<Labels> LabelsType = Avros.records(Labels.class);
>>
>> PCollection<Labels> training_labels = input.parallelDo(new
>> LabelDataParser(), LabelsType);
>>
>> PTable<String, Labels> labels_data = training_labels.
>>
>>  parallelDo(new KeyOnLabels("sample_ID"), tableOf(strings(),
>> LabelsType));
>>
>> //get features
>>
>> PType<Feats> FeatsType = Avros.records(Feats.class);
>>
>> PCollection<Feats> training_feats = Feature.FeatLoader(pipeline,
>> sample_features_inputs);
>>
>> PTable<String, Feats> feats_data = training_feats.parallelDo(new
>> KeyOnFeats("sample_ID"), tableOf(strings(), FeatsType));
>>
>>
>> //join labels and features
>>
>> JoinStrategy<String, Labels, Feats> strategy = new
>> DefaultJoinStrategy<String, Labels, Feats>(20);
>>
>> PTable<String, Pair<Labels, Feats>> joined_training = strategy.
>>
>> join(labels_data, feats_data, JoinType.INNER_JOIN);
>>
>>
>> //class Labels
>>
>> public class Labels implements java.io.Serializable, Cloneable{
>>
>>  private String class_ID;
>>
>> private String sample_ID;
>>
>> private int binary_ind;
>>
>>  public Labels()
>>
>> {
>>
>> this(null, null, 0);
>>
>> }
>>
>>          public Labels(String class_ID, String sample_ID, int ind)
>>
>> {
>>
>> this.class_ID = class_ID;
>>
>> this.sample_ID = sample_ID;
>>
>> this.binary_ind = ind;
>>
>> }
>>
>>         ...
>>
>> }
>>
>>
>> //class Feats
>>
>>
>> public class *Feats* implements java.io.Serializable, Cloneable{
>>
>>  private String sample_id;
>>
>> private String sample_name;
>>
>> private Map<String, Float> feat;
>>
>>  public Feats()
>>
>> {
>>
>> this(null, null, null);
>>
>> }
>>
>>  public Feats(String id, String name, Map<String, Float> feat)
>>
>> {
>>
>> this.sample_id = id;
>>
>> this.sample_name = name;
>>
>> this.feat = feat;
>>
>>  }
>>
>>        ...
>>
>>
>> }
>>
>>
>>    The outputs of labels_data and feats_data are both fine; but the join
>> step throws the following exception:
>>
>>
>> Error: java.lang.ClassCastException: java.lang.String cannot be cast to
>> org.apache.crunch.Pair at
>> org.apache.crunch.lib.join.DefaultJoinStrategy$1.map(DefaultJoinStrategy.java:87)
>> at org.apache.crunch.MapFn.process(MapFn.java:34) at
>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at
>> org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)
>> at org.apache.crunch.MapFn.process(MapFn.java:34) at
>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at
>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:109) at
>> org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60) at
>> org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) at
>> org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763) at
>> org.apache.hadoop.mapred.MapTask.run(MapTask.java:339) at
>> org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162) at
>> java.security.AccessController.doPrivileged(Native Method) at
>> javax.security.auth.Subject.doAs(Subject.java:415) at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
>> at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)
>>
>>
>>        This issue already bothered me for a while; Did any one get a
>> similar issue here? Is there another option that will solve it?
>>
>>
>>       Btw, I already successfully ran the following joining job:
>>
>>
>> JoinStrategy<String, Float, Tuple3<String, String, Float>> strategy = new
>> DefaultJoinStrategy<String, Float, Tuple3<String, String, Float>>(100);
>>
>>  PTable<String, Pair<Float,Tuple3<String, String, Float>>> joined =
>> strategy.join(input_A, input_B, JoinType.INNER_JOIN);
>>
>>
>>    So I guess the issue may be still related to the Avro types that I
>> defined.
>>
>>
>>         Thanks for your advice.
>>
>>
>> Lucy
>>
>>
>>
>>
>>
>

Re: question about join

Posted by Lucy Chen <lu...@gmail.com>.
Meanwhile, I just realized that one of my joining jobs look also OK, which
also had the Avro type in it:

JoinStrategy<String, Feats, Feats> strategy

= new DefaultJoinStrategy<String, Feats, Feats>(50);

PTable<String, Pair<Feats,Feats>> joined = strategy.join(input_A, input_B,
JoinType.INNER_JOIN);


So the Avro type probably is not the issue.


Any advice?


Lucy

On Wed, May 13, 2015 at 12:22 PM, Lucy Chen <lu...@gmail.com>
wrote:

> Hi all,
>
>          I had a join step in my crunch pipeline, and it looks like the
> following:
>
> //get label data
>
> PType<Labels> LabelsType = Avros.records(Labels.class);
>
> PCollection<Labels> training_labels = input.parallelDo(new
> LabelDataParser(), LabelsType);
>
> PTable<String, Labels> labels_data = training_labels.
>
>  parallelDo(new KeyOnLabels("sample_ID"), tableOf(strings(), LabelsType));
>
> //get features
>
> PType<Feats> FeatsType = Avros.records(Feats.class);
>
> PCollection<Feats> training_feats = Feature.FeatLoader(pipeline,
> sample_features_inputs);
>
> PTable<String, Feats> feats_data = training_feats.parallelDo(new
> KeyOnFeats("sample_ID"), tableOf(strings(), FeatsType));
>
>
> //join labels and features
>
> JoinStrategy<String, Labels, Feats> strategy = new
> DefaultJoinStrategy<String, Labels, Feats>(20);
>
> PTable<String, Pair<Labels, Feats>> joined_training = strategy.
>
> join(labels_data, feats_data, JoinType.INNER_JOIN);
>
>
> //class Labels
>
> public class Labels implements java.io.Serializable, Cloneable{
>
>  private String class_ID;
>
> private String sample_ID;
>
> private int binary_ind;
>
>  public Labels()
>
> {
>
> this(null, null, 0);
>
> }
>
>          public Labels(String class_ID, String sample_ID, int ind)
>
> {
>
> this.class_ID = class_ID;
>
> this.sample_ID = sample_ID;
>
> this.binary_ind = ind;
>
> }
>
>         ...
>
> }
>
>
> //class Feats
>
>
> public class *Feats* implements java.io.Serializable, Cloneable{
>
>  private String sample_id;
>
> private String sample_name;
>
> private Map<String, Float> feat;
>
>  public Feats()
>
> {
>
> this(null, null, null);
>
> }
>
>  public Feats(String id, String name, Map<String, Float> feat)
>
> {
>
> this.sample_id = id;
>
> this.sample_name = name;
>
> this.feat = feat;
>
>  }
>
>        ...
>
>
> }
>
>
>    The outputs of labels_data and feats_data are both fine; but the join
> step throws the following exception:
>
>
> Error: java.lang.ClassCastException: java.lang.String cannot be cast to
> org.apache.crunch.Pair at
> org.apache.crunch.lib.join.DefaultJoinStrategy$1.map(DefaultJoinStrategy.java:87)
> at org.apache.crunch.MapFn.process(MapFn.java:34) at
> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at
> org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)
> at org.apache.crunch.MapFn.process(MapFn.java:34) at
> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at
> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:109) at
> org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60) at
> org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) at
> org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763) at
> org.apache.hadoop.mapred.MapTask.run(MapTask.java:339) at
> org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162) at
> java.security.AccessController.doPrivileged(Native Method) at
> javax.security.auth.Subject.doAs(Subject.java:415) at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
> at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)
>
>
>        This issue already bothered me for a while; Did any one get a
> similar issue here? Is there another option that will solve it?
>
>
>       Btw, I already successfully ran the following joining job:
>
>
> JoinStrategy<String, Float, Tuple3<String, String, Float>> strategy = new
> DefaultJoinStrategy<String, Float, Tuple3<String, String, Float>>(100);
>
>  PTable<String, Pair<Float,Tuple3<String, String, Float>>> joined =
> strategy.join(input_A, input_B, JoinType.INNER_JOIN);
>
>
>    So I guess the issue may be still related to the Avro types that I
> defined.
>
>
>         Thanks for your advice.
>
>
> Lucy
>
>
>
>
>