You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Rachana Srivastava <Ra...@markmonitor.com> on 2015/09/14 18:54:25 UTC

JavaRDD using Reflection

Hello all,

I am working a problem that requires us to create different set of JavaRDD based on different input arguments.  We are getting following error when we try to use a factory to create JavaRDD.  Error message is clear but I am wondering is there any workaround.

Question:
How to create different set of JavaRDD based on different input arguments dynamically.  Trying to implement something like factory pattern.

Error Message:
RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.

Thanks,

Rachana

Re: JavaRDD using Reflection

Posted by Ankur Srivastava <an...@gmail.com>.
It is not reflection that is the issue here but use of an RDD
transformation "featureKeyClassPair.map" inside "lines.mapToPair".

>From the code snippet you have sent it is not very clear if
getFeatureScore(id,data)
invokes executeFeedFeatures, but if that is the case it is not very obvious
that “data” is a supposed to be huge and thus need to be  PairRDD and if it
is not you do not need to use the JavaPairRDD<String, String>, instead use
a Map<String, String> and return a List<Double>.

If it data is huge and has to be PairRDD pull out the logic to build the
data PairRDD and then invoke map function on that RDD.

- Ankur

On Mon, Sep 14, 2015 at 12:43 PM, <Ra...@thomsonreuters.com>
wrote:

> Thanks so much Ajay and Ankur for your input.
>
>
>
> What we are trying to do is following:  I am trying to invoke a class
> using Java reflection to get the result
>
>
>
> *THIS WORKS FINE*
>
> public static void main(String[] args) throws Exception {
>
>     final JavaSparkContext jsc = new JavaSparkContext(sparkConf);
>
>     ...
>
> METHOD THAT I AM TRYING TO INVOKE USING REFLECTION
>
>     JavaPairDStream<String, String> urlFeatureScore = lines.mapToPair( new
> PairFunction<String, String, String>() {
>
>         public Tuple2<String, String> call(final String urlString) throws
> Exception {
>
>             String  featureScore = getFeatureScore(id,data);
>
>           return new Tuple2<String, String>(urlString,  featureScore);
>
>         }
>
>       });
>
>     ...
>
> *REPLACED WITH METHOD INVOKED USING REFLECTION DOES NOT WORK ERROR MESSAGE
> BELOW.*
>
> >  executeFeedFactories2(featureClassName, featureParam, featureData)
>
>     jssc.start();
>
>     jssc.awaitTermination();
>
>   }
>
>
>
> *Splitting the same work to Class  using Reflection does not work:*
>
>
>
> private static  JavaRDD<Double>  executeFeedFactories2(String
> featureClassName, Map<String, String> featureParam,JavaPairRDD<String,
> String> featureData) throws Exception {
>
>                 Class featureClass = Class.forName(MyClass);
>
>                Method m = featureClass.getMethod("executeFeedFeatures",
> Map.class, JavaPairRDD.class);
>
>                JavaRDD<Double>  score = ( JavaRDD<Double> )
> m.invoke(featureClass.newInstance(), featureParam,featureData);
>
>                 return score;
>
> }
>
>
>
> public class MyClass{
>
> public static JavaRDD<Double> executeFeedFeatures(*Map* featureParamMap,JavaPairRDD<String,
> String> featureKeyClassPair ){
>
> featureScoreRDD = featureKeyClassPair.map(new Function<Tuple2<String,
> String>, Double>() {
>
>                 public Double call(Tuple2<String, String> keyValue) {
>
>                                 …
>
>                 }
>
>                 });
>
>                 return featureScoreRDD;
>
>                 }
>
>
>
> }
>
>
>
> Thanks again for all your help and advice.
>
>
>
> Regards,
>
>
>
> Rachana
>
>
>
> *From:* Ajay Singal [mailto:asingal11@gmail.com]
> *Sent:* Monday, September 14, 2015 12:20 PM
> *To:* Rachana Srivastava; Ankur Srivastava
> *Cc:* user@spark.apache.org; dev@spark.apache.org; Ajay Singal
> *Subject:* Re: JavaRDD using Reflection
>
>
>
> Hello Rachana,
>
>
>
> The easiest way would be to start with creating a 'parent' JavaRDD and run
> different filters (based on different input arguments) to create respective
> 'child' JavaRDDs dynamically.
>
>
>
> Notice that the creation of these children RDDs is handled by the
> application driver.
>
>
>
> Hope this helps!
>
> Ajay
>
>
>
> On Mon, Sep 14, 2015 at 1:21 PM, Ankur Srivastava <
> ankur.srivastava@gmail.com> wrote:
>
> Hi Rachana
>
>
>
> I didn't get you r question fully but as the error says you can not
> perform a rdd transformation or action inside another transformation. In
> your example you are performing an action "rdd2.values.count()" in side
> the "map" transformation. It is not allowed and in any case this will be
> very inefficient too.
>
>
>
> you should do something like this:
>
>
>
> final long rdd2_count = rdd2.values.count()
>
> rdd1.map(x => rdd2_count * x)
>
>
>
> Hope this helps!!
>
>
>
> - Ankur
>
>
>
> On Mon, Sep 14, 2015 at 9:54 AM, Rachana Srivastava <
> Rachana.Srivastava@markmonitor.com> wrote:
>
> Hello all,
>
>
>
> I am working a problem that requires us to create different set of JavaRDD
> based on different input arguments.  We are getting following error when we
> try to use a factory to create JavaRDD.  Error message is clear but I am
> wondering is there any workaround.
>
>
>
> *Question: *
>
> How to create different set of JavaRDD based on different input arguments
> dynamically.  Trying to implement something like factory pattern.
>
>
>
> *Error Message: *
>
> RDD transformations and actions can only be invoked by the driver, not
> inside of other transformations; for example, rdd1.map(x =>
> rdd2.values.count() * x) is invalid because the values transformation and
> count action cannot be performed inside of the rdd1.map transformation. For
> more information, see SPARK-5063.
>
>
>
> Thanks,
>
>
>
> Rachana
>
>
>
>
>

Re: JavaRDD using Reflection

Posted by Ankur Srivastava <an...@gmail.com>.
It is not reflection that is the issue here but use of an RDD
transformation "featureKeyClassPair.map" inside "lines.mapToPair".

>From the code snippet you have sent it is not very clear if
getFeatureScore(id,data)
invokes executeFeedFeatures, but if that is the case it is not very obvious
that “data” is a supposed to be huge and thus need to be  PairRDD and if it
is not you do not need to use the JavaPairRDD<String, String>, instead use
a Map<String, String> and return a List<Double>.

If it data is huge and has to be PairRDD pull out the logic to build the
data PairRDD and then invoke map function on that RDD.

- Ankur

On Mon, Sep 14, 2015 at 12:43 PM, <Ra...@thomsonreuters.com>
wrote:

> Thanks so much Ajay and Ankur for your input.
>
>
>
> What we are trying to do is following:  I am trying to invoke a class
> using Java reflection to get the result
>
>
>
> *THIS WORKS FINE*
>
> public static void main(String[] args) throws Exception {
>
>     final JavaSparkContext jsc = new JavaSparkContext(sparkConf);
>
>     ...
>
> METHOD THAT I AM TRYING TO INVOKE USING REFLECTION
>
>     JavaPairDStream<String, String> urlFeatureScore = lines.mapToPair( new
> PairFunction<String, String, String>() {
>
>         public Tuple2<String, String> call(final String urlString) throws
> Exception {
>
>             String  featureScore = getFeatureScore(id,data);
>
>           return new Tuple2<String, String>(urlString,  featureScore);
>
>         }
>
>       });
>
>     ...
>
> *REPLACED WITH METHOD INVOKED USING REFLECTION DOES NOT WORK ERROR MESSAGE
> BELOW.*
>
> >  executeFeedFactories2(featureClassName, featureParam, featureData)
>
>     jssc.start();
>
>     jssc.awaitTermination();
>
>   }
>
>
>
> *Splitting the same work to Class  using Reflection does not work:*
>
>
>
> private static  JavaRDD<Double>  executeFeedFactories2(String
> featureClassName, Map<String, String> featureParam,JavaPairRDD<String,
> String> featureData) throws Exception {
>
>                 Class featureClass = Class.forName(MyClass);
>
>                Method m = featureClass.getMethod("executeFeedFeatures",
> Map.class, JavaPairRDD.class);
>
>                JavaRDD<Double>  score = ( JavaRDD<Double> )
> m.invoke(featureClass.newInstance(), featureParam,featureData);
>
>                 return score;
>
> }
>
>
>
> public class MyClass{
>
> public static JavaRDD<Double> executeFeedFeatures(*Map* featureParamMap,JavaPairRDD<String,
> String> featureKeyClassPair ){
>
> featureScoreRDD = featureKeyClassPair.map(new Function<Tuple2<String,
> String>, Double>() {
>
>                 public Double call(Tuple2<String, String> keyValue) {
>
>                                 …
>
>                 }
>
>                 });
>
>                 return featureScoreRDD;
>
>                 }
>
>
>
> }
>
>
>
> Thanks again for all your help and advice.
>
>
>
> Regards,
>
>
>
> Rachana
>
>
>
> *From:* Ajay Singal [mailto:asingal11@gmail.com]
> *Sent:* Monday, September 14, 2015 12:20 PM
> *To:* Rachana Srivastava; Ankur Srivastava
> *Cc:* user@spark.apache.org; dev@spark.apache.org; Ajay Singal
> *Subject:* Re: JavaRDD using Reflection
>
>
>
> Hello Rachana,
>
>
>
> The easiest way would be to start with creating a 'parent' JavaRDD and run
> different filters (based on different input arguments) to create respective
> 'child' JavaRDDs dynamically.
>
>
>
> Notice that the creation of these children RDDs is handled by the
> application driver.
>
>
>
> Hope this helps!
>
> Ajay
>
>
>
> On Mon, Sep 14, 2015 at 1:21 PM, Ankur Srivastava <
> ankur.srivastava@gmail.com> wrote:
>
> Hi Rachana
>
>
>
> I didn't get you r question fully but as the error says you can not
> perform a rdd transformation or action inside another transformation. In
> your example you are performing an action "rdd2.values.count()" in side
> the "map" transformation. It is not allowed and in any case this will be
> very inefficient too.
>
>
>
> you should do something like this:
>
>
>
> final long rdd2_count = rdd2.values.count()
>
> rdd1.map(x => rdd2_count * x)
>
>
>
> Hope this helps!!
>
>
>
> - Ankur
>
>
>
> On Mon, Sep 14, 2015 at 9:54 AM, Rachana Srivastava <
> Rachana.Srivastava@markmonitor.com> wrote:
>
> Hello all,
>
>
>
> I am working a problem that requires us to create different set of JavaRDD
> based on different input arguments.  We are getting following error when we
> try to use a factory to create JavaRDD.  Error message is clear but I am
> wondering is there any workaround.
>
>
>
> *Question: *
>
> How to create different set of JavaRDD based on different input arguments
> dynamically.  Trying to implement something like factory pattern.
>
>
>
> *Error Message: *
>
> RDD transformations and actions can only be invoked by the driver, not
> inside of other transformations; for example, rdd1.map(x =>
> rdd2.values.count() * x) is invalid because the values transformation and
> count action cannot be performed inside of the rdd1.map transformation. For
> more information, see SPARK-5063.
>
>
>
> Thanks,
>
>
>
> Rachana
>
>
>
>
>

Re: JavaRDD using Reflection

Posted by Ajay Singal <as...@gmail.com>.
Hello Rachana,

The easiest way would be to start with creating a 'parent' JavaRDD and run
different filters (based on different input arguments) to create respective
'child' JavaRDDs dynamically.

Notice that the creation of these children RDDs is handled by the
application driver.

Hope this helps!
Ajay

On Mon, Sep 14, 2015 at 1:21 PM, Ankur Srivastava <
ankur.srivastava@gmail.com> wrote:

> Hi Rachana
>
> I didn't get you r question fully but as the error says you can not
> perform a rdd transformation or action inside another transformation. In
> your example you are performing an action "rdd2.values.count()" in side
> the "map" transformation. It is not allowed and in any case this will be
> very inefficient too.
>
> you should do something like this:
>
> final long rdd2_count = rdd2.values.count()
> rdd1.map(x => rdd2_count * x)
>
> Hope this helps!!
>
> - Ankur
>
> On Mon, Sep 14, 2015 at 9:54 AM, Rachana Srivastava <
> Rachana.Srivastava@markmonitor.com> wrote:
>
>> Hello all,
>>
>>
>>
>> I am working a problem that requires us to create different set of
>> JavaRDD based on different input arguments.  We are getting following error
>> when we try to use a factory to create JavaRDD.  Error message is clear but
>> I am wondering is there any workaround.
>>
>>
>>
>> *Question: *
>>
>> How to create different set of JavaRDD based on different input arguments
>> dynamically.  Trying to implement something like factory pattern.
>>
>>
>>
>> *Error Message: *
>>
>> RDD transformations and actions can only be invoked by the driver, not
>> inside of other transformations; for example, rdd1.map(x =>
>> rdd2.values.count() * x) is invalid because the values transformation and
>> count action cannot be performed inside of the rdd1.map transformation. For
>> more information, see SPARK-5063.
>>
>>
>>
>> Thanks,
>>
>>
>>
>> Rachana
>>
>
>

Re: JavaRDD using Reflection

Posted by Ajay Singal <as...@gmail.com>.
Hello Rachana,

The easiest way would be to start with creating a 'parent' JavaRDD and run
different filters (based on different input arguments) to create respective
'child' JavaRDDs dynamically.

Notice that the creation of these children RDDs is handled by the
application driver.

Hope this helps!
Ajay

On Mon, Sep 14, 2015 at 1:21 PM, Ankur Srivastava <
ankur.srivastava@gmail.com> wrote:

> Hi Rachana
>
> I didn't get you r question fully but as the error says you can not
> perform a rdd transformation or action inside another transformation. In
> your example you are performing an action "rdd2.values.count()" in side
> the "map" transformation. It is not allowed and in any case this will be
> very inefficient too.
>
> you should do something like this:
>
> final long rdd2_count = rdd2.values.count()
> rdd1.map(x => rdd2_count * x)
>
> Hope this helps!!
>
> - Ankur
>
> On Mon, Sep 14, 2015 at 9:54 AM, Rachana Srivastava <
> Rachana.Srivastava@markmonitor.com> wrote:
>
>> Hello all,
>>
>>
>>
>> I am working a problem that requires us to create different set of
>> JavaRDD based on different input arguments.  We are getting following error
>> when we try to use a factory to create JavaRDD.  Error message is clear but
>> I am wondering is there any workaround.
>>
>>
>>
>> *Question: *
>>
>> How to create different set of JavaRDD based on different input arguments
>> dynamically.  Trying to implement something like factory pattern.
>>
>>
>>
>> *Error Message: *
>>
>> RDD transformations and actions can only be invoked by the driver, not
>> inside of other transformations; for example, rdd1.map(x =>
>> rdd2.values.count() * x) is invalid because the values transformation and
>> count action cannot be performed inside of the rdd1.map transformation. For
>> more information, see SPARK-5063.
>>
>>
>>
>> Thanks,
>>
>>
>>
>> Rachana
>>
>
>

Re: JavaRDD using Reflection

Posted by Ankur Srivastava <an...@gmail.com>.
Hi Rachana

I didn't get you r question fully but as the error says you can not perform
a rdd transformation or action inside another transformation. In your
example you are performing an action "rdd2.values.count()" in side the "map"
transformation. It is not allowed and in any case this will be very
inefficient too.

you should do something like this:

final long rdd2_count = rdd2.values.count()
rdd1.map(x => rdd2_count * x)

Hope this helps!!

- Ankur

On Mon, Sep 14, 2015 at 9:54 AM, Rachana Srivastava <
Rachana.Srivastava@markmonitor.com> wrote:

> Hello all,
>
>
>
> I am working a problem that requires us to create different set of JavaRDD
> based on different input arguments.  We are getting following error when we
> try to use a factory to create JavaRDD.  Error message is clear but I am
> wondering is there any workaround.
>
>
>
> *Question: *
>
> How to create different set of JavaRDD based on different input arguments
> dynamically.  Trying to implement something like factory pattern.
>
>
>
> *Error Message: *
>
> RDD transformations and actions can only be invoked by the driver, not
> inside of other transformations; for example, rdd1.map(x =>
> rdd2.values.count() * x) is invalid because the values transformation and
> count action cannot be performed inside of the rdd1.map transformation. For
> more information, see SPARK-5063.
>
>
>
> Thanks,
>
>
>
> Rachana
>