You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Ser Kho <kh...@yahoo.com> on 2016/06/06 19:14:02 UTC

Does Flink allows for encapsulation of transformations?

The question is how to encapsulate numerous transformations into one object or may be a function in Apache Flink Java setting. I have tried to investigate this question using an example of Pi calculation (see below). I am wondering whether or not the suggested approach is valid from the Flink's point of view. It works on one computer, however, I do not know how it will behave in a cluster setup. The code is given below, and the main idea behind it as follows:   
   - Create a class, named classPI, which method compute() does all data transformations, see more about it below.
   - In the main method create a DataSet as in DataSet< classPI > opi = env.fromElements(new classPI());
   - Create DataSet< Double > PI, which equals output of transformation map() that calls the object PI's method compute() as inDataSet< Double > PI = opi.map(new MapFunction< classPI , Double>() { public Double map(classPI objPI) { return objPI.compute(); }});
   - Now about ClassPI      
      - Constructor instantiates ExecutionEnvironment, which is local for this class, as inpublic classPI(){ this.NumIter=1000000; env = ExecutionEnvironment.getExecutionEnvironment();}

Thus, the code has two ExecutionEnvironment objects: one in main and another in the class classPI.   
   - Has method compute() that runs all data transormations (in this example it is just several lines but potentially it might contain tons of Flink transfromations)public Double compute(){ DataSet count = env.generateSequence(1, NumIter) .map(new Sampler()) .reduce(new SumReducer()); PI = 4.0*count.collect().get(0)/NumIter;   
return PI;}
the whole code is given below. Again, the question is if this is a valid approach for encapsulation of data transformation into a class in Flink setup that is supposed to be parallelizable to work on a cluster. Is there a better way to hide details of data transformations?Thanks a lot!
-------------------------The code ----------------------
public class PiEstimation{

public static void main(String[] args) throws Exception 
{
// this is one ExecutionEnvironment
 final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();   
// this is critical DataSet with my classPI that computes PI
 DataSet<classPI> opi = env.fromElements(new classPI());
// this map calls the method compute() of class classPI that computes PI
 DataSet<Double> PI = opi.map(new MapFunction<classPI , Double>() 
{
   public Double map(classPI  objPI) throws Exception { 
   // this is how I call method compute() that calculates PI using transformations  
   return objPI.compute(); } });    

   double pi = PI.collect().get(0);
   System.out.println("We estimate Pi to be: " + pi);   
}

// this class is of no impotance for my question, howerver, it is relevant for pi calculation 
public static class Sampler implements MapFunction<Long, Long> {
@Override
public Long map(Long value) {
    double x = Math.random();
    double y = Math.random();
    return (x * x + y * y) < 1 ? 1L : 0L;}}

// this class is of no impotance for my question, howerver, it is relevant for pi calculation 
public static final class SumReducer implements ReduceFunction<Long>{
  @Override
  public Long reduce(Long value1, Long value2) {
  return value1 + value2;}}

// this is my class that computes PI, my question is whether such a class is valid in Flink on  cluster with parallel computation 
public static final class classPI
{
   public Integer NumIter;
   private final ExecutionEnvironment env;
   public Double PI;

   // this is constructor with another ExecutionEnvironment
   public   classPI(){
           this.NumIter=1000000;
            env = ExecutionEnvironment.getExecutionEnvironment();
   }
   //This is the the method that contains all data transformation
   public Double compute() throws Exception{
         DataSet<Long> count = env.generateSequence(1, NumIter)
                               .map(new Sampler())
                               .reduce(new SumReducer());
         PI = 4.0*count.collect().get(0)/NumIter;                                           
         return  PI;}}}

Re: Does Flink allows for encapsulation of transformations?

Posted by Chesnay Schepler <ch...@apache.org>.
Q1:
Whether one of your classes requires the *e**nv* parameter depends on 
whether you want to create a new Source or set a ExecutionEnvironment 
parameter inside the class.
If you don't you can of course not pass it :)
I can't see anything that would prevent it form running on a cluster.

Q2:
Usually, parameters are passed to a UDF through the constructor. You 
/can /use a DataSet within a function initializer block,
but it's rather unusual (this is in-fact the first time I've seen it 
done this way).

You can also just pass a long into the constructor, there is no need to 
use a DataSet and collect().

final ExecutionEnvironment env =  ExecutionEnvironment.getExecutionEnvironment();

DataSet<Double> Radius = env.fromElements(10.0); final long numIter = 
1000000L; DataSet<Double> pi = new 
classPI(env).compute(numIter);DataSet<Double> LengthCircle = new 
classLengthCircle().computeLengthCircle(pi, Radius);

public static final class classPI implements Serializable {
	private final ExecutionEnvironment env;

	public classPI(ExecutionEnvironment env) {
		this.env = env;
	}

	public DataSet<Double> compute( final long numIter) throws Exception {
                 return  this.env.generateSequence(1, numIter)
                 .map(new Sampler())
                 .reduce(new SumReducer())
                 .map(new MapFunction<Long, Double >() {
                 	@Override
                     	public Double map(Long arg0) throws Exception {
                     		return arg0 *4.0/numIter;
                   	}});
	}
}

Regards,
Chesnay


//On 10.06.2016 02:46, Ser Kho wrote:
>> Chesnay:
>> I have two simple questions, related to the previous ones about 
>> encapsulation of transformations.
>>
>> Question 1. I have tried to extend my code using your suggestions and 
>> come up with a small concern. First, your code:
> |publicstaticvoidmain(String[]args)throwsException{ 
> ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment(); *DataSet<Double> **pi 
> =new classPI(env).compute();*new 
> classThatNeedsPI(env).computeWhatever(pi); //append your 
> transformations to pi env.execute(); } |
> |
> |
> |
> |
> Below is my code (the bold lines are very similar and work ok). The 
> line of concern is marked by blue color. The issue is that I do not 
> use*env* in the constructor of the class classLengthCircle(),instead I 
> use  DataSet *pi* in the methodcomputeLengthCircle(pi, Radius)
> and also DataSet Radius, but the latter does not matter for the 
> question. Then, I proceed with transformations using this 
> DataSet***pi,* see the**class classLengthCircle below. It seems that 
> the logic of this class and its method computeLengthCircle() does not 
> require env at all. My question is if this  code work will on a 
> cluster (it does work on a local computer)?
>
> final ExecutionEnvironment env = 
>  ExecutionEnvironment.getExecutionEnvironment();
>         DataSet<Double> Radius = env.fromElements(10.0);
>         DataSet<Long>    NumIter =env.fromElements(1000000L);
>         // this line is similar to the suggested
> * DataSet<Double> pi = new classPI(env).compute(NumIter);*
> // this line is somewhat different from the suggested, as it has no 
> env in the constructor
>        DataSet<Double> LengthCircle = new 
> classLengthCircle().computeLengthCircle(pi, Radius);
> =========================
> public static final class classLengthCircle
>     {
>  public  DataSet<Double> computeLengthCircle(DataSet<Double> pi, 
> DataSet<Double> Radius)
>         {
>  DataSet<Double> result = pi.cross(Radius).map(
>  new MapFunction<Tuple2<Double, Double>, Double >() {
> @Override
> public Double map(Tuple2<Double, Double> arg0) throws Exception {
> return 2*arg0.f0 *arg0.f1;
> }}
>  );
> return result;
>        }
>     }
>> ================================================
>> Question 2:
>> I tried to enter a parameter *DataSet NumIter* into a class 
>> *MapFunction* of transformation *map(), *see the blue mark in the 
>> code below. It seems this parameter appears in the MapFunction 
>> without explicit passing, since nowhere the line
>>  .map(new MapFunction<Long, Double >()
>> has any mentioning of NumIter.
>> Is the suggested approach a right way to pass a parameter inside the 
>> transformation MapFunction ?
>> Note, that the code works all right on a single computer.
>>
>> public static final class classPI implements Serializable
>    {
> private final ExecutionEnvironment env;
> publicclassPI(ExecutionEnvironment env) {this.env = env;}
> public  DataSet<Double>  compute( final  DataSet<Long> NumIter) throws 
> Exception{
> return  this.env.generateSequence(1, NumIter.collect().get(0))
> .map(new Sampler())
> .reduce(new SumReducer())
> .map(new MapFunction<Long, Double >()
> {
> *Long N = NumIter.collect().get(0);*
> @Override
> public Double map(Long arg0) throws Exception {
> return arg0 *4.0/N;
> }}); }}
>
>> Thanks a lot for your time.
>> Ser
>
>
>
> On Tuesday, June 7, 2016 8:14 AM, Chesnay Schepler 
> <ch...@apache.org> wrote:
>
>
> 1a. ah. yeah i see how it could work, but i wouldn't count on it in a 
> cluster.
> you would (most likely) run the the sub-job (calculating pi) only on a 
> single node.
>
> 1b. different execution environments generally imply different flink 
> programs.
>
> 2. sure it does, since it's a normal flink job. yours on the other 
> hand doesn't, since the job calculating PI only runs on a single 
> TaskManager.
>
> 3. there are 2 ways. you can either chain jobs like this: (effectively 
> running 2 flink programs in succession)
> |publicstaticvoidmain(String[]args)throwsException{doublepi =new 
> classPI().compute();System.out.println("We estimate Pi to be: " +pi); 
> new classThatNeedsPI().computeWhatever(pi); //feeds pi into an 
> env.fromElements call and proceeds from there }|
> or (if all building blocks are flink programs) build a single job:
> |publicstaticvoidmain(String[]args)throwsException{ 
> ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment(); DataSet<Double> pi 
> =new classPI(env).compute();new 
> classThatNeedsPI(env).computeWhatever(pi); //append your 
> transformations to pi env.execute(); } ... ||publicDataSet<Double>compute()throwsException{return 
> this.env.generateSequence(1,NumIter).map(newSampler()).reduce(newSumReducer()) 
> .map(/*return 4 * x*/);} ... public ? computeWhatever(DataSet<Long> 
> pi) throws Exception { ... } |
>
> On 07.06.2016 13:35, Ser Kho wrote:
>> Chesnay:
>> 1a. The code actually works, that is the point.
>> 1b. What restrict for a Flink program to have several execution 
>> environments?
>> 2. I am not sure that your modification allows for parallelism. Does it?
>> 3. This code is a simple example of writing/organizing large and 
>> complicated programs, where the result of this pi needed to be used 
>> in another DataSet transformations beyond classPi(). What to do in 
>> this case?
>> Thanks a lot for the suggestions.
>>
>>
>> On Tuesday, June 7, 2016 6:15 AM, Chesnay Schepler 
>> <ch...@apache.org> <ma...@apache.org> wrote:
>>
>>
>> from what i can tell from your code you are trying to execute a job 
>> within a job. This just doesn't work.
>>
>> your main method should look like this:
>>
>> |publicstaticvoidmain(String[]args)throwsException{doublepi =new 
>> classPI().compute();System.out.println("We estimate Pi to be: " +pi);}|
>>
>>
>>
>> On 06.06.2016 21:14, Ser Kho wrote:
>>> The question is how to encapsulate numerous transformations into one 
>>> object or may be a function in Apache Flink Java setting. I have 
>>> tried to investigate this question using an example of Pi 
>>> calculation (see below). I am wondering whether or not the suggested 
>>> approach is valid from the Flink's point of view. It works on one 
>>> computer, however, I do not know how it will behave in a cluster 
>>> setup. The code is given below, and the main idea behind it as follows:
>>>
>>>  1. Create a class, named classPI, which method compute() does all
>>>     data transformations, see more about it below.
>>>  2. In the main method create a DataSet as in *DataSet< classPI >
>>>     opi = env.fromElements(new classPI());*
>>> 3.
>>>     Create *DataSet< Double > PI*, which equals output of
>>>     transformation map() that calls the object PI's method compute()
>>>     as in
>>>     *DataSet< Double > PI = opi.map(new MapFunction< classPI ,
>>>     Double>() { public Double map(classPI objPI) { return
>>>     objPI.compute(); }});*
>>> 4.
>>>     Now about ClassPI
>>>      *
>>>         Constructor instantiates ExecutionEnvironment, which is
>>>         local for this class, as in
>>>         *public classPI(){ this.NumIter=1000000; env =
>>>         ExecutionEnvironment.getExecutionEnvironment();}*
>>>
>>> Thus, the code has two ExecutionEnvironment objects: one in main and 
>>> another in the class classPI.
>>>
>>>  *
>>>     Has method compute() that runs all data transormations (in this
>>>     example it is just several lines but potentially it might
>>>     contain tons of Flink transfromations)
>>>     *public Double compute(){ DataSet count =
>>>     env.generateSequence(1, NumIter) .map(new Sampler()) .reduce(new
>>>     SumReducer()); PI = 4.0*count.collect().get(0)/NumIter;
>>>     return PI;}*
>>>
>>> the whole code is given below. Again, the question is if this is a 
>>> valid approach for encapsulation of data transformation into a class 
>>> in Flink setup that is supposed to be parallelizable to work on a 
>>> cluster. Is there a better way to hide details of data transformations?
>>> Thanks a lot!
>>>
>>> -------------------------The code ----------------------
>>>
>>> |public< span id="yiv9579689340yui_3_16_0_ym19_1_1465213860132_46078" 
>>> style="margin:0px;border:0px;color:rgb(16, 16, 
>>> 148);">classPiEstimation{publicstaticvoidmain(String[]args)throwsException{// 
>>> this is one ExecutionEnvironmentfinalExecutionEnvironmentenv 
>>> =ExecutionEnvironment .getExecutionEnvironment();// this is critical 
>>> DataSet with my classPI that computes PIDataSet<classPI>opi 
>>> =env.fromElements(newclassPI());// this map calls the method 
>>> compute() of class classPI that computes PIDataSet<Double>PI 
>>> =opi.map(newMapFunction<classPI ,Double>(){publicDoublemap(classPI 
>>> objPI)throwsException{// this is how I call method compute() that 
>>> calculates PI using transformations 
>>> returnobjPI.compute();}});doublepi 
>>> =PI.collect().get(0);System.out.println("We estimate Pi to be: " 
>>> +pi);}// this class is of no impotance for my question, howerver, it 
>>> is relevant for pi calculation 
>>> publicstaticclassSamplerimplementsMapFunction<Long,Long>{@OverridepublicLongmap(Longvalue){doublex 
>>> =Math.random();doubley =Math.random();return(x *x +y 
>>> *y)<1?1L:0L;}}// this class is of no impotance for my question, 
>>> howerver, it is relevant for pi calculation 
>>> publicstaticfinalclassSumReducerimplementsReduceFunction<Long>{@OverridepublicLongreduce(Longvalue1,Longvalue2){returnvalue1 
>>> +value2;}}// this is my class that computes PI, my question is 
>>> whether such a class is valid in Flink on cluster with parallel 
>>> computation publicstaticfinalclassclassPI 
>>> {publicIntegerNumIter;privatefinalExecutionEnvironmentenv;publicDoublePI;// 
>>> this is constructor with another 
>>> ExecutionEnvironmentpublicclassPI(){this.NumIter=1000000;env 
>>> =ExecutionEnvironment.getExecutionEnvironment();}//This is the the 
>>> method that contains all data 
>>> transformationpublicDoublecompute()throwsException{DataSet<Long>count =env.generateSequence(1,NumIter).map(newSampler()) 
>>> .reduce(newSumReducer()) ;PI 
>>> =4.0*count.collect().get(0)/NumIter;returnPI;}}}|
>>
>>
>>
>
>
>


Re: Does Flink allows for encapsulation of transformations?

Posted by Ser Kho <kh...@yahoo.com>.
Chesnay: I have two simple questions, related to the previous ones about encapsulation of transformations. 
Question 1. I have tried to extend my code using your suggestions and come up with a small concern. First, your code:
public static void main(String[] args) throws Exception 
{
   ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
   DataSet<Double> pi = new classPI(env).compute();
   new classThatNeedsPI(env).computeWhatever(pi); //append your transformations to pi
	env.execute();
 }


Below is my code (the bold lines are very similar and work ok). The line of concern is marked by blue color. The issue is that I do not use env in the constructor of the class classLengthCircle(), instead I use  DataSet pi in the method  computeLengthCircle(pi, Radius)and also DataSet Radius, but the latter does not matter for the question. Then, I proceed with transformations using this DataSet pi, see the  class classLengthCircle below. It seems that the logic of this class and its method computeLengthCircle() does not require env at all. My question is if this  code work will on a cluster (it does work on a local computer)?
    final ExecutionEnvironment env =  ExecutionEnvironment.getExecutionEnvironment();               DataSet<Double> Radius = env.fromElements(10.0);            DataSet<Long>    NumIter =env.fromElements(1000000L);              // this line is similar to the suggested           DataSet<Double> pi = new classPI(env).compute(NumIter);  // this line is somewhat different from the suggested, as it has no env in the constructor           DataSet<Double> LengthCircle = new classLengthCircle().computeLengthCircle(pi, Radius); =========================  public static final class classLengthCircle    {        public  DataSet<Double> computeLengthCircle(DataSet<Double> pi, DataSet<Double> Radius)        {       DataSet<Double> result = pi.cross(Radius).map(       new MapFunction<Tuple2<Double, Double>, Double >() { @Override     public Double map(Tuple2<Double, Double> arg0) throws Exception {     return 2*arg0.f0 *arg0.f1;     }}         ); return result;          }         } 
================================================Question 2:
I tried to enter a parameter DataSet NumIter into a class  MapFunction of transformation map(), see the blue mark in the code below. It seems this parameter appears in the MapFunction without explicit passing, since nowhere the line .map(new MapFunction<Long, Double >() has any mentioning of NumIter.Is the suggested approach a right way to pass a parameter inside the transformation MapFunction ?Note, that the code works all right on a single computer.
public static final class classPI implements Serializable
   {  private final ExecutionEnvironment env;  public classPI(ExecutionEnvironment env) {this.env = env;} public  DataSet<Double>  compute( final  DataSet<Long> NumIter) throws Exception{  return  this.env.generateSequence(1, NumIter.collect().get(0)) .map(new Sampler()) .reduce(new SumReducer()) .map(new MapFunction<Long, Double >()   { Long N = NumIter.collect().get(0);  @Override public Double map(Long arg0) throws Exception { return arg0 *4.0/N; }}); }}

Thanks a lot for your time.Ser



    On Tuesday, June 7, 2016 8:14 AM, Chesnay Schepler <ch...@apache.org> wrote:
 

   1a. ah. yeah i see how it could work, but i wouldn't count on it in a cluster.
you would (most likely) run the the sub-job (calculating pi) only on a single node.
 
1b. different execution environments generally imply different flink programs.
 
2. sure it does, since it's a normal flink job. yours on the other hand doesn't, since the job calculating PI only runs on a single TaskManager.
 
3. there are 2 ways. you can either chain jobs like this: (effectively running 2 flink programs in succession)
 public static void main(String[] args) throws Exception 
{
  double pi = new classPI().compute();
   System.out.println("We estimate Pi to be: "
 + pi);
  new classThatNeedsPI().computeWhatever(pi); //feeds pi into an env.fromElements call and proceeds from there
 } or (if all building blocks are flink programs) build a single job:
 public static void main(String[] args) throws Exception 
{
	ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
	DataSet<Double> pi = new classPI(env).compute();
  	new classThatNeedsPI(env).computeWhatever(pi); //append your transformations to pi
	env.execute();
 }

...
public DataSet<Double> compute() throws Exception {
	return this.env.generateSequence(1, NumIter)
		.map(new Sampler())
		.reduce(new SumReducer())
		.map(/*return 4 * x*/);}
...

public ? computeWhatever(DataSet<Long> pi) throws Exception {
	...
}
 
On 07.06.2016 13:35, Ser Kho wrote:
  
  Chesnay: 
  1a. The code actually works, that is the point.  1b. What restrict for a Flink program to have several execution environments? 2. I am not sure that your modification allows for parallelism. Does it? 3. This code is a simple example of writing/organizing large and complicated programs, where the result of this pi needed to be used in another DataSet transformations beyond classPi(). What to do in this case? Thanks a lot for the suggestions. 
 
      On Tuesday, June 7, 2016 6:15 AM, Chesnay Schepler <ch...@apache.org> wrote:
  
 
    from what i can tell from your code you are trying to execute a job within a job. This just doesn't work.
 
 your main method should look like this:
 
 public static void main(String[] args) throws Exception 
{
  double pi = new classPI().compute();
   System.out.println("We estimate Pi to be: "
 + pi);   
} 
 
 
 On 06.06.2016 21:14, Ser Kho wrote:
    
  The question is how to encapsulate numerous transformations into one object or may be a function in Apache Flink Java setting. I have tried to investigate this question using an example of Pi calculation (see below). I am wondering whether or not the suggested approach is valid from the Flink's point of view. It works on one computer, however, I do not know how it will behave in a cluster setup. The code is given below, and the main idea behind it as follows:    
   - Create a class, named classPI, which method compute() does all data transformations, see more about it below.
   - In the main method create a DataSet as in DataSet< classPI > opi = env.fromElements(new classPI());
   -  Create DataSet< Double > PI, which equals output of transformation map() that calls the object PI's method compute() as in DataSet< Double > PI = opi.map(new MapFunction< classPI , Double>() { public Double map(classPI objPI) { return objPI.compute(); }}); 
   -  Now about ClassPI       
      -  Constructor instantiates ExecutionEnvironment, which is local for this class, as in public classPI(){ this.NumIter=1000000; env = ExecutionEnvironment.getExecutionEnvironment();} 
 
 Thus, the code has two ExecutionEnvironment objects: one in main and another in the class classPI.    
   -  Has method compute() that runs all data transormations (in this example it is just several lines but potentially it might contain tons of Flink transfromations) public Double compute(){ DataSet count = env.generateSequence(1, NumIter) .map(new Sampler()) .reduce(new SumReducer()); PI = 4.0*count.collect().get(0)/NumIter;   
 return PI;} 
 the whole code is given below. Again, the question is if this is a valid approach for encapsulation of data transformation into a class in Flink setup that is supposed to be parallelizable to work on a cluster. Is  there a better way to hide details of data transformations? Thanks a lot! 
  -------------------------The code ---------------------- 
  public <
span id="yiv9579689340yui_3_16_0_ym19_1_1465213860132_46078" style="margin:0px;border:0px;color:rgb(16, 16, 148);">class PiEstimation{

public static void main(String[] args) throws Exception 
{
// this is one ExecutionEnvironment
 final ExecutionEnvironment env = ExecutionEnvironment
.getExecutionEnvironment();   
// this is critical DataSet with my classPI that computes PI
 DataSet<classPI> opi = env.fromElements(new classPI());
// this map calls the method compute() of class classPI that computes PI
 DataSet<Double> PI = opi.map(new MapFunction<classPI , Double>() 
{
   public Double map(classPI  objPI) throws Exception { 
   // this is how I call method compute() that calculates PI using transformations  
   return objPI.compute(); } });    

   double pi = PI.collect().get(0);
   System.out.println("We estimate Pi to be: "
 + pi);   
}

// this class is of no impotance for my question, howerver, it is relevant for pi calculation 
public static class Sampler implements MapFunction<Long, Long> {
@Override
public Long map(Long value) {
    double x = Math.random();
    double y = Math.random();
    return (x * x + y * y) < 1 ? 1L : 
0L;}}

// this class is of no impotance for my question, howerver, it is relevant for pi calculation 
public static final class SumReducer implements ReduceFunction<Long>{
  @Override
  public Long reduce(Long value1, Long value2) {
  return value1 + value2;}}

// this is my class that computes PI, my question is whether such a class is valid in Flink on  cluster with parallel computation 
public static final class classPI
{
   public Integer NumIter;
   private final ExecutionEnvironment env;
   public Double PI;

   // this is constructor with another ExecutionEnvironment
   public   classPI(){
           this.NumIter=1000000;
            env = ExecutionEnvironment.getExecutionEnvironment();
   }
   //This is the the method that contains all data transformation
   public Double compute() throws Exception{
         DataSet<Long> count = env.generateSequence(1, NumIter
)
                               .map(new Sampler())

                               .reduce(new SumReducer())
;
         PI = 4.0*count.collect().get(0)/NumIter;                      
                     
         return  PI;}}}  
 
    
 
      
 
 

  

Re: Does Flink allows for encapsulation of transformations?

Posted by Chesnay Schepler <ch...@apache.org>.
1a. ah. yeah i see how it could work, but i wouldn't count on it in a 
cluster.
you would (most likely) run the the sub-job (calculating pi) only on a 
single node.

1b. different execution environments generally imply different flink 
programs.

2. sure it does, since it's a normal flink job. yours on the other hand 
doesn't, since the job calculating PI only runs on a single TaskManager.

3. there are 2 ways. you can either chain jobs like this: (effectively 
running 2 flink programs in succession)

|publicstaticvoidmain(String[]args)throwsException{doublepi =new 
classPI().compute();System.out.println("We estimate Pi to be: "+pi); new 
classThatNeedsPI().computeWhatever(pi); //feeds pi into an 
env.fromElements call and proceeds from there }|

or (if all building blocks are flink programs) build a single job:

|publicstaticvoidmain(String[]args)throwsException{ ExecutionEnvironment 
env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Double> pi 
=new classPI(env).compute();new 
classThatNeedsPI(env).computeWhatever(pi); //append your transformations 
to pi env.execute(); } ... ||publicDataSet<Double>compute()throwsException{return 
this.env.generateSequence(1,NumIter).map(newSampler()).reduce(newSumReducer()) 
.map(/*return 4 * x*/);} ... public ? computeWhatever(DataSet<Long> pi) 
throws Exception { ... } |


On 07.06.2016 13:35, Ser Kho wrote:
> Chesnay:
> 1a. The code actually works, that is the point.
> 1b. What restrict for a Flink program to have several execution 
> environments?
> 2. I am not sure that your modification allows for parallelism. Does it?
> 3. This code is a simple example of writing/organizing large and 
> complicated programs, where the result of this pi needed to be used in 
> another DataSet transformations beyond classPi(). What to do in this case?
> Thanks a lot for the suggestions.
>
>
> On Tuesday, June 7, 2016 6:15 AM, Chesnay Schepler 
> <ch...@apache.org> wrote:
>
>
> from what i can tell from your code you are trying to execute a job 
> within a job. This just doesn't work.
>
> your main method should look like this:
>
> |publicstaticvoidmain(String[]args)throwsException{doublepi =new 
> classPI().compute();System.out.println("We estimate Pi to be: "+pi);}|
>
>
>
> On 06.06.2016 21:14, Ser Kho wrote:
>> The question is how to encapsulate numerous transformations into one 
>> object or may be a function in Apache Flink Java setting. I have 
>> tried to investigate this question using an example of Pi calculation 
>> (see below). I am wondering whether or not the suggested approach is 
>> valid from the Flink's point of view. It works on one computer, 
>> however, I do not know how it will behave in a cluster setup. The 
>> code is given below, and the main idea behind it as follows:
>>
>>  1. Create a class, named classPI, which method compute() does all
>>     data transformations, see more about it below.
>>  2. In the main method create a DataSet as in *DataSet< classPI > opi
>>     = env.fromElements(new classPI());*
>> 3.
>>     Create *DataSet< Double > PI*, which equals output of
>>     transformation map() that calls the object PI's method compute()
>>     as in
>>     *DataSet< Double > PI = opi.map(new MapFunction< classPI ,
>>     Double>() { public Double map(classPI objPI) { return
>>     objPI.compute(); }});*
>> 4.
>>     Now about ClassPI
>>      *
>>         Constructor instantiates ExecutionEnvironment, which is local
>>         for this class, as in
>>         *public classPI(){ this.NumIter=1000000; env =
>>         ExecutionEnvironment.getExecutionEnvironment();}*
>>
>> Thus, the code has two ExecutionEnvironment objects: one in main and 
>> another in the class classPI.
>>
>>  *
>>     Has method compute() that runs all data transormations (in this
>>     example it is just several lines but potentially it might contain
>>     tons of Flink transfromations)
>>     *public Double compute(){ DataSet count = env.generateSequence(1,
>>     NumIter) .map(new Sampler()) .reduce(new SumReducer()); PI =
>>     4.0*count.collect().get(0)/NumIter;
>>     return PI;}*
>>
>> the whole code is given below. Again, the question is if this is a 
>> valid approach for encapsulation of data transformation into a class 
>> in Flink setup that is supposed to be parallelizable to work on a 
>> cluster. Is there a better way to hide details of data transformations?
>> Thanks a lot!
>>
>> -------------------------The code ----------------------
>>
>> |publicclassPiEstimation{publicstaticvoidmain(String[]args)throwsException{// 
>> this is one ExecutionEnvironmentfinalExecutionEnvironmentenv 
>> =ExecutionEnvironment.getExecutionEnvironment();// this is critical 
>> DataSet with my classPI that computes PIDataSet<classPI>opi 
>> =env.fromElements(newclassPI());// this map calls the method 
>> compute() of class classPI that computes PIDataSet<Double>PI 
>> =opi.map(newMapFunction<classPI ,Double>(){publicDoublemap(classPI 
>> objPI)throwsException{// this is how I call method compute() that 
>> calculates PI using transformations 
>> returnobjPI.compute();}});doublepi 
>> =PI.collect().get(0);System.out.println("We estimate Pi to be: 
>> "+pi);}// this class is of no impotance for my question, howerver, it 
>> is relevant for pi calculation 
>> publicstaticclassSamplerimplementsMapFunction<Long,Long>{@OverridepublicLongmap(Longvalue){doublex 
>> =Math.random();doubley =Math.random();return(x *x +y *y)<1?1L:0L;}}// 
>> this class is of no impotance for my question, howerver, it is 
>> relevant for pi calculation 
>> publicstaticfinalclassSumReducerimplementsReduceFunction<Long>{@OverridepublicLongreduce(Longvalue1,Longvalue2){returnvalue1 
>> +value2;}}// this is my class that computes PI, my question is 
>> whether such a class is valid in Flink on cluster with parallel 
>> computation publicstaticfinalclassclassPI 
>> {publicIntegerNumIter;privatefinalExecutionEnvironmentenv;publicDoublePI;// 
>> this is constructor with another 
>> ExecutionEnvironmentpublicclassPI(){this.NumIter=1000000;env 
>> =ExecutionEnvironment.getExecutionEnvironment();}//This is the the 
>> method that contains all data 
>> transformationpublicDoublecompute()throwsException{DataSet<Long>count 
>> =env.generateSequence(1,NumIter).map(newSampler()).reduce(newSumReducer());PI 
>> =4.0*count.collect().get(0)/NumIter;returnPI;}}}|
>
>
>


Re: Does Flink allows for encapsulation of transformations?

Posted by Ser Kho <kh...@yahoo.com>.
Chesnay: 
1a. The code actually works, that is the point. 1b. What restrict for a Flink program to have several execution environments?2. I am not sure that your modification allows for parallelism. Does it?3. This code is a simple example of writing/organizing large and complicated programs, where the result of this pi needed to be used in another DataSet transformations beyond classPi(). What to do in this case?Thanks a lot for the suggestions. 

    On Tuesday, June 7, 2016 6:15 AM, Chesnay Schepler <ch...@apache.org> wrote:
 

  from what i can tell from your code you are trying to execute a job within a job. This just doesn't work.
 
 your main method should look like this:
 
 public static void main(String[] args) throws Exception 
{
  double pi = new classPI().compute();
   System.out.println("We estimate Pi to be: " + pi);   
} 
 
 
 On 06.06.2016 21:14, Ser Kho wrote:
  
  The question is how to encapsulate numerous transformations into one object or may be a function in Apache Flink Java setting. I have tried to investigate this question using an example of Pi calculation (see below). I am wondering whether or not the suggested approach is valid from the Flink's point of view. It works on  one computer, however, I do not know how it will behave in a cluster setup. The code is given below, and the main idea behind it as follows:    
   - Create a class, named classPI, which method compute() does all data transformations, see more about it below.
   - In the main method create a DataSet as in DataSet< classPI > opi = env.fromElements(new classPI());
   -  Create DataSet< Double > PI, which equals output of transformation map() that calls the object PI's method compute() as in DataSet< Double > PI = opi.map(new MapFunction< classPI , Double>() { public Double map(classPI objPI) { return objPI.compute(); }}); 
   -  Now about ClassPI       
      -  Constructor instantiates ExecutionEnvironment, which is local for this class, as in public classPI(){ this.NumIter=1000000; env = ExecutionEnvironment.getExecutionEnvironment();} 
 
 Thus, the code has two ExecutionEnvironment objects: one in main and another in the class classPI.    
   -  Has method compute() that runs all data transormations (in this example it is just several lines but potentially it might contain tons of Flink transfromations) public Double compute(){ DataSet count = env.generateSequence(1, NumIter) .map(new Sampler()) .reduce(new SumReducer()); PI = 4.0*count.collect().get(0)/NumIter;   
 return PI;} 
 the whole code is given below. Again, the question is if this is a valid approach for encapsulation of data transformation into a class in Flink setup that is supposed to be parallelizable to work on a cluster. Is there a better way to hide details of data transformations? Thanks a lot! 
  -------------------------The code ---------------------- 
  public class PiEstimation{

public static void main(String[] args) throws Exception 
{
// this is one ExecutionEnvironment
 final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();   
// this is critical DataSet with my classPI that computes PI
 DataSet<classPI> opi = env.fromElements(new classPI());
// this map calls the method compute() of class classPI that computes PI
 DataSet<Double> PI = opi.map(new MapFunction<classPI , Double>() 
{
   public Double map(classPI  objPI) throws Exception { 
   // this is how I call method compute() that calculates PI using transformations  
   return objPI.compute(); } });    

   double pi = PI.collect().get(0);
   System.out.println("We estimate Pi to be: " + pi);   
}

// this class is of no impotance for my question, howerver, it is relevant for pi calculation 
public static class Sampler implements MapFunction<Long, Long> {
@Override
public Long map(Long value) {
    double x = Math.random();
    double y = Math.random();
    return (x * x + y * y) < 1 ? 1L : 0L;}}

// this class is of no impotance for my question, howerver, it is relevant for pi calculation 
public static final class SumReducer implements ReduceFunction<Long>{
  @Override
  public Long reduce(Long value1, Long value2) {
  return value1 + value2;}}

// this is my class that computes PI, my question is whether such a class is valid in Flink on  cluster with parallel computation 
public static final class classPI
{
   public Integer NumIter;
   private final ExecutionEnvironment env;
   public Double PI;

   // this is constructor with another ExecutionEnvironment
   public   classPI(){
           this.NumIter=1000000;
            env = ExecutionEnvironment.getExecutionEnvironment();
   }
   //This is the the method that contains all data transformation
   public Double compute() throws Exception{
         DataSet<Long> count = env.generateSequence(1, NumIter
)
                               .map(new Sampler())
                               .reduce(new SumReducer());
         PI = 4.0*count.collect().get(0)/NumIter;                      
                     
         return  PI;}}}  
 
 

  

Re: Does Flink allows for encapsulation of transformations?

Posted by Chesnay Schepler <ch...@apache.org>.
from what i can tell from your code you are trying to execute a job 
within a job. This just doesn't work.

your main method should look like this:

|publicstaticvoidmain(String[]args)throwsException{doublepi =new 
classPI().compute();System.out.println("We estimate Pi to be: "+pi);}|




On 06.06.2016 21:14, Ser Kho wrote:
> The question is how to encapsulate numerous transformations into one 
> object or may be a function in Apache Flink Java setting. I have tried 
> to investigate this question using an example of Pi calculation (see 
> below). I am wondering whether or not the suggested approach is valid 
> from the Flink's point of view. It works on one computer, however, I 
> do not know how it will behave in a cluster setup. The code is given 
> below, and the main idea behind it as follows:
>
>  1. Create a class, named classPI, which method compute() does all
>     data transformations, see more about it below.
>  2. In the main method create a DataSet as in *DataSet< classPI > opi
>     = env.fromElements(new classPI());*
> 3.
>     Create *DataSet< Double > PI*, which equals output of
>     transformation map() that calls the object PI's method compute() as in
>     *DataSet< Double > PI = opi.map(new MapFunction< classPI ,
>     Double>() { public Double map(classPI objPI) { return
>     objPI.compute(); }});*
> 4.
>     Now about ClassPI
>      *
>         Constructor instantiates ExecutionEnvironment, which is local
>         for this class, as in
>         *public classPI(){ this.NumIter=1000000; env =
>         ExecutionEnvironment.getExecutionEnvironment();}*
>
> Thus, the code has two ExecutionEnvironment objects: one in main and 
> another in the class classPI.
>
>  *
>     Has method compute() that runs all data transormations (in this
>     example it is just several lines but potentially it might contain
>     tons of Flink transfromations)
>     *public Double compute(){ DataSet count = env.generateSequence(1,
>     NumIter) .map(new Sampler()) .reduce(new SumReducer()); PI =
>     4.0*count.collect().get(0)/NumIter;
>     return PI;}*
>
> the whole code is given below. Again, the question is if this is a 
> valid approach for encapsulation of data transformation into a class 
> in Flink setup that is supposed to be parallelizable to work on a 
> cluster. Is there a better way to hide details of data transformations?
> Thanks a lot!
>
> -------------------------The code ----------------------
>
> |publicclassPiEstimation{publicstaticvoidmain(String[]args)throwsException{// 
> this is one ExecutionEnvironmentfinalExecutionEnvironmentenv 
> =ExecutionEnvironment.getExecutionEnvironment();// this is critical 
> DataSet with my classPI that computes PIDataSet<classPI>opi 
> =env.fromElements(newclassPI());// this map calls the method compute() 
> of class classPI that computes PIDataSet<Double>PI 
> =opi.map(newMapFunction<classPI ,Double>(){publicDoublemap(classPI 
> objPI)throwsException{// this is how I call method compute() that 
> calculates PI using transformations returnobjPI.compute();}});doublepi 
> =PI.collect().get(0);System.out.println("We estimate Pi to be: 
> "+pi);}// this class is of no impotance for my question, howerver, it 
> is relevant for pi calculation 
> publicstaticclassSamplerimplementsMapFunction<Long,Long>{@OverridepublicLongmap(Longvalue){doublex 
> =Math.random();doubley =Math.random();return(x *x +y *y)<1?1L:0L;}}// 
> this class is of no impotance for my question, howerver, it is 
> relevant for pi calculation 
> publicstaticfinalclassSumReducerimplementsReduceFunction<Long>{@OverridepublicLongreduce(Longvalue1,Longvalue2){returnvalue1 
> +value2;}}// this is my class that computes PI, my question is whether 
> such a class is valid in Flink on cluster with parallel computation 
> publicstaticfinalclassclassPI 
> {publicIntegerNumIter;privatefinalExecutionEnvironmentenv;publicDoublePI;// 
> this is constructor with another 
> ExecutionEnvironmentpublicclassPI(){this.NumIter=1000000;env 
> =ExecutionEnvironment.getExecutionEnvironment();}//This is the the 
> method that contains all data 
> transformationpublicDoublecompute()throwsException{DataSet<Long>count 
> =env.generateSequence(1,NumIter).map(newSampler()).reduce(newSumReducer());PI 
> =4.0*count.collect().get(0)/NumIter;returnPI;}}}|


Re: Does Flink allows for encapsulation of transformations?

Posted by Ser Kho <kh...@yahoo.com>.
Chesnay:Just want to thank you. I might have one or two related questions later on, but now just thanks.

 

    On Tuesday, June 7, 2016 8:18 AM, Greg Hogan <co...@greghogan.com> wrote:
 

 "The question is how to encapsulate numerous transformations into one object or may be a function in Apache Flink Java setting."

Implement CustomUnaryOperation. This can then be applied to a DataSet by calling `DataSet result = DataSet.runOperation(new MyOperation<>(...));`.

On Mon, Jun 6, 2016 at 3:14 PM, Ser Kho <kh...@yahoo.com> wrote:

The question is how to encapsulate numerous transformations into one object or may be a function in Apache Flink Java setting. I have tried to investigate this question using an example of Pi calculation (see below). I am wondering whether or not the suggested approach is valid from the Flink's point of view. It works on one computer, however, I do not know how it will behave in a cluster setup. The code is given below, and the main idea behind it as follows:   
   - Create a class, named classPI, which method compute() does all data transformations, see more about it below.
   - In the main method create a DataSet as in DataSet< classPI > opi = env.fromElements(new classPI());
   - Create DataSet< Double > PI, which equals output of transformation map() that calls the object PI's method compute() as inDataSet< Double > PI = opi.map(new MapFunction< classPI , Double>() { public Double map(classPI objPI) { return objPI.compute(); }});
   - Now about ClassPI      
      - Constructor instantiates ExecutionEnvironment, which is local for this class, as inpublic classPI(){ this.NumIter=1000000; env = ExecutionEnvironment.getExecutionEnvironment();}

Thus, the code has two ExecutionEnvironment objects: one in main and another in the class classPI.   
   - Has method compute() that runs all data transormations (in this example it is just several lines but potentially it might contain tons of Flink transfromations)public Double compute(){ DataSet count = env.generateSequence(1, NumIter) .map(new Sampler()) .reduce(new SumReducer()); PI = 4.0*count.collect().get(0)/NumIter;   
return PI;}
the whole code is given below. Again, the question is if this is a valid approach for encapsulation of data transformation into a class in Flink setup that is supposed to be parallelizable to work on a cluster. Is there a better way to hide details of data transformations?Thanks a lot!
-------------------------The code ----------------------
public class PiEstimation{

public static void main(String[] args) throws Exception 
{
// this is one ExecutionEnvironment
 final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();   
// this is critical DataSet with my classPI that computes PI
 DataSet<classPI> opi = env.fromElements(new classPI());
// this map calls the method compute() of class classPI that computes PI
 DataSet<Double> PI = opi.map(new MapFunction<classPI , Double>() 
{
   public Double map(classPI  objPI) throws Exception { 
   // this is how I call method compute() that calculates PI using transformations  
   return objPI.compute(); } });    

   double pi = PI.collect().get(0);
   System.out.println("We estimate Pi to be: " + pi);   
}

// this class is of no impotance for my question, howerver, it is relevant for pi calculation 
public static class Sampler implements MapFunction<Long, Long> {
@Override
public Long map(Long value) {
    double x = Math.random();
    double y = Math.random();
    return (x * x + y * y) < 1 ? 1L : 0L;}}

// this class is of no impotance for my question, howerver, it is relevant for pi calculation 
public static final class SumReducer implements ReduceFunction<Long>{
  @Override
  public Long reduce(Long value1, Long value2) {
  return value1 + value2;}}

// this is my class that computes PI, my question is whether such a class is valid in Flink on  cluster with parallel computation 
public static final class classPI
{
   public Integer NumIter;
   private final ExecutionEnvironment env;
   public Double PI;

   // this is constructor with another ExecutionEnvironment
   public   classPI(){
           this.NumIter=1000000;
            env = ExecutionEnvironment.getExecutionEnvironment();
   }
   //This is the the method that contains all data transformation
   public Double compute() throws Exception{
         DataSet<Long> count = env.generateSequence(1, NumIter)
                               .map(new Sampler())
                               .reduce(new SumReducer());
         PI = 4.0*count.collect().get(0)/NumIter;                                           
         return  PI;}}}



  

Re: Does Flink allows for encapsulation of transformations?

Posted by Greg Hogan <co...@greghogan.com>.
"The question is how to encapsulate numerous transformations into one
object or may be a function in Apache Flink Java setting."

Implement CustomUnaryOperation. This can then be applied to a DataSet by
calling `DataSet result = DataSet.runOperation(new MyOperation<>(...));`.

On Mon, Jun 6, 2016 at 3:14 PM, Ser Kho <kh...@yahoo.com> wrote:

> The question is how to encapsulate numerous transformations into one
> object or may be a function in Apache Flink Java setting. I have tried to
> investigate this question using an example of Pi calculation (see below). I
> am wondering whether or not the suggested approach is valid from the
> Flink's point of view. It works on one computer, however, I do not know how
> it will behave in a cluster setup. The code is given below, and the main
> idea behind it as follows:
>
>    1. Create a class, named classPI, which method compute() does all data
>    transformations, see more about it below.
>    2. In the main method create a DataSet as in *DataSet< classPI > opi =
>    env.fromElements(new classPI());*
>    3. Create *DataSet< Double > PI*, which equals output of
>    transformation map() that calls the object PI's method compute() as in
>    *DataSet< Double > PI = opi.map(new MapFunction< classPI , Double>() {
>    public Double map(classPI objPI) { return objPI.compute(); }});*
>    4. Now about ClassPI
>    - Constructor instantiates ExecutionEnvironment, which is local for
>       this class, as in
>       *public classPI(){ this.NumIter=1000000; env =
>       ExecutionEnvironment.getExecutionEnvironment();}*
>
> Thus, the code has two ExecutionEnvironment objects: one in main and
> another in the class classPI.
>
>    - Has method compute() that runs all data transormations (in this
>    example it is just several lines but potentially it might contain tons of
>    Flink transfromations)
>
> *public Double compute(){ DataSet count = env.generateSequence(1, NumIter)
>    .map(new Sampler()) .reduce(new SumReducer()); PI =
>    4.0*count.collect().get(0)/NumIter; return PI;}*
>
> the whole code is given below. Again, the question is if this is a valid
> approach for encapsulation of data transformation into a class in Flink
> setup that is supposed to be parallelizable to work on a cluster. Is there
> a better way to hide details of data transformations?
> Thanks a lot!
>
> -------------------------The code ----------------------
>
> public class PiEstimation{
> public static void main(String[] args) throws Exception {// this is one ExecutionEnvironment
>  final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();   // this is critical DataSet with my classPI that computes PI
>  DataSet<classPI> opi = env.fromElements(new classPI());// this map calls the method compute() of class classPI that computes PI
>  DataSet<Double> PI = opi.map(new MapFunction<classPI , Double>() {
>    public Double map(classPI  objPI) throws Exception {
>    // this is how I call method compute() that calculates PI using transformations
>    return objPI.compute(); } });
>
>    double pi = PI.collect().get(0);
>    System.out.println("We estimate Pi to be: " + pi);   }
> // this class is of no impotance for my question, howerver, it is relevant for pi calculation public static class Sampler implements MapFunction<Long, Long> {@Overridepublic Long map(Long value) {
>     double x = Math.random();
>     double y = Math.random();
>     return (x * x + y * y) < 1 ? 1L : 0L;}}
> // this class is of no impotance for my question, howerver, it is relevant for pi calculation public static final class SumReducer implements ReduceFunction<Long>{
>   @Override
>   public Long reduce(Long value1, Long value2) {
>   return value1 + value2;}}
> // this is my class that computes PI, my question is whether such a class is valid in Flink on  cluster with parallel computation public static final class classPI{
>    public Integer NumIter;
>    private final ExecutionEnvironment env;
>    public Double PI;
>
>    // this is constructor with another ExecutionEnvironment
>    public   classPI(){
>            this.NumIter=1000000;
>             env = ExecutionEnvironment.getExecutionEnvironment();
>    }
>    //This is the the method that contains all data transformation
>    public Double compute() throws Exception{
>          DataSet<Long> count = env.generateSequence(1, NumIter)
>                                .map(new Sampler())
>                                .reduce(new SumReducer());
>          PI = 4.0*count.collect().get(0)/NumIter;
>          return  PI;}}}
>
>