You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Anton Kravchenko <kr...@gmail.com> on 2017/06/15 18:29:26 UTC

access a broadcasted variable from within ForeachPartitionFunction Java API

How one would access a broadcasted variable from within
ForeachPartitionFunction  Spark(2.0.1) Java API ?

Integer _bcv = 123;
Broadcast<Integer> bcv = spark.sparkContext().broadcast(_bcv);
Dataset<Row> df_sql = spark.sql("select * from atable");

df_sql.foreachPartition(new ForeachPartitionFunction<Row>() {
    public void call(Iterator<Row> t) throws Exception {
        System.out.println(bcv.value());
    }}
);

Re: access a broadcasted variable from within ForeachPartitionFunction Java API

Posted by Ryan <ry...@gmail.com>.
have to say sorry. I check the code again, Broadcast is serializable and
should be able to use within lambdas/inner classes. actually according to
the javadoc it should be used in this way to avoid the large contained
value object's serialization.

so what's wrong with the first approach?

On Sat, Jun 24, 2017 at 4:46 AM, Anton Kravchenko <
kravchenko.anton86@gmail.com> wrote:

> ok, this one is doing what I want
>
> SparkConf conf = new SparkConf()
>         .set("spark.sql.warehouse.dir", "hdfs://localhost:9000/user/hive/warehouse")
>         .setMaster("local[*]")
>         .setAppName("TestApp");
>
> JavaSparkContext sc = new JavaSparkContext(conf);
>
> SparkSession session = SparkSession
>         .builder()
>         .appName("TestApp").master("local[*]")
>         .getOrCreate();
>
> Integer _bcv =  123;
> Broadcast<Integer> bcv = sc.broadcast(_bcv);
>
> WrapBCV.setBCV(bcv); // implemented in WrapBCV.java
>
> df_sql.foreachPartition(new ProcessSinglePartition()); //implemented in ProcessSinglePartition.java
>
> Where:
> ProcessSinglePartition.java
>
> public class ProcessSinglePartition implements ForeachPartitionFunction<Row>  {
>     public void call(Iterator<Row> it) throws Exception {
>         System.out.println(WrapBCV.getBCV());
>     }
> }
>
> WrapBCV.java
>
> public class WrapBCV {
>     private static Broadcast<Integer> bcv;
>     public static void setBCV(Broadcast<Integer> setbcv){ bcv = setbcv; }
>     public static Integer getBCV()
>     {
>         return bcv.value();
>     }
> }
>
>
> On Fri, Jun 16, 2017 at 3:35 AM, Ryan <ry...@gmail.com> wrote:
>
>> I don't think Broadcast itself can be serialized. you can get the value
>> out on the driver side and refer to it in foreach, then the value would be
>> serialized with the lambda expr and sent to workers.
>>
>> On Fri, Jun 16, 2017 at 2:29 AM, Anton Kravchenko <
>> kravchenko.anton86@gmail.com> wrote:
>>
>>> How one would access a broadcasted variable from within
>>> ForeachPartitionFunction  Spark(2.0.1) Java API ?
>>>
>>> Integer _bcv = 123;
>>> Broadcast<Integer> bcv = spark.sparkContext().broadcast(_bcv);
>>> Dataset<Row> df_sql = spark.sql("select * from atable");
>>>
>>> df_sql.foreachPartition(new ForeachPartitionFunction<Row>() {
>>>     public void call(Iterator<Row> t) throws Exception {
>>>         System.out.println(bcv.value());
>>>     }}
>>> );
>>>
>>>
>>
>

Re: access a broadcasted variable from within ForeachPartitionFunction Java API

Posted by Anton Kravchenko <kr...@gmail.com>.
ok, this one is doing what I want

SparkConf conf = new SparkConf()
        .set("spark.sql.warehouse.dir",
"hdfs://localhost:9000/user/hive/warehouse")
        .setMaster("local[*]")
        .setAppName("TestApp");

JavaSparkContext sc = new JavaSparkContext(conf);

SparkSession session = SparkSession
        .builder()
        .appName("TestApp").master("local[*]")
        .getOrCreate();

Integer _bcv =  123;
Broadcast<Integer> bcv = sc.broadcast(_bcv);

WrapBCV.setBCV(bcv); // implemented in WrapBCV.java

df_sql.foreachPartition(new ProcessSinglePartition()); //implemented
in ProcessSinglePartition.java

Where:
ProcessSinglePartition.java

public class ProcessSinglePartition implements ForeachPartitionFunction<Row>  {
    public void call(Iterator<Row> it) throws Exception {
        System.out.println(WrapBCV.getBCV());
    }
}

WrapBCV.java

public class WrapBCV {
    private static Broadcast<Integer> bcv;
    public static void setBCV(Broadcast<Integer> setbcv){ bcv = setbcv; }
    public static Integer getBCV()
    {
        return bcv.value();
    }
}


On Fri, Jun 16, 2017 at 3:35 AM, Ryan <ry...@gmail.com> wrote:

> I don't think Broadcast itself can be serialized. you can get the value
> out on the driver side and refer to it in foreach, then the value would be
> serialized with the lambda expr and sent to workers.
>
> On Fri, Jun 16, 2017 at 2:29 AM, Anton Kravchenko <
> kravchenko.anton86@gmail.com> wrote:
>
>> How one would access a broadcasted variable from within
>> ForeachPartitionFunction  Spark(2.0.1) Java API ?
>>
>> Integer _bcv = 123;
>> Broadcast<Integer> bcv = spark.sparkContext().broadcast(_bcv);
>> Dataset<Row> df_sql = spark.sql("select * from atable");
>>
>> df_sql.foreachPartition(new ForeachPartitionFunction<Row>() {
>>     public void call(Iterator<Row> t) throws Exception {
>>         System.out.println(bcv.value());
>>     }}
>> );
>>
>>
>

Re: access a broadcasted variable from within ForeachPartitionFunction Java API

Posted by Ryan <ry...@gmail.com>.
I don't think Broadcast itself can be serialized. you can get the value out
on the driver side and refer to it in foreach, then the value would be
serialized with the lambda expr and sent to workers.

On Fri, Jun 16, 2017 at 2:29 AM, Anton Kravchenko <
kravchenko.anton86@gmail.com> wrote:

> How one would access a broadcasted variable from within
> ForeachPartitionFunction  Spark(2.0.1) Java API ?
>
> Integer _bcv = 123;
> Broadcast<Integer> bcv = spark.sparkContext().broadcast(_bcv);
> Dataset<Row> df_sql = spark.sql("select * from atable");
>
> df_sql.foreachPartition(new ForeachPartitionFunction<Row>() {
>     public void call(Iterator<Row> t) throws Exception {
>         System.out.println(bcv.value());
>     }}
> );
>
>