You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Andres Angel <in...@gmail.com> on 2019/07/29 18:30:27 UTC

FlatMap returning Row<> based on ArrayList elements()

Hello everyone,

I need to parse into an anonymous function an input data to turn it into
several Row elements. Originally I would have done something like
Row.of(1,2,3,4) but these elements can change on the flight as part of my
function. This is why I have decided to store them in a list and right now
it looks something like this:

[image: image.png]

Now, I need to return my out Collector it Row<> based on this elements. I
checked on the Flink documentation but the Lambda functions are not
supported :
https://ci.apache.org/projects/flink/flink-docs-stable/dev/java_lambdas.html ,
Then I though ok I can loop my ArrayList in a Tuple and pass this tuple as
Row.of(myTuple):

                    Tuple mytuple = Tuple.newInstance(5);
                    for (int i = 0; i < pelements.size(); i++) {
                        mytuple.setField(pelements.get(i), i);
                    }
                    out.collect(Row.of(mytuple));


However , it doesnt work because this is being parsed s 1 element for
sqlQuery step. how could I do something like:

pelements.forEach(n->out.collect(Row.of(n)));

Thanks so much

Re:Re: FlatMap returning Row<> based on ArrayList elements()

Posted by Haibo Sun <su...@163.com>.
Hi AU,


> The problem with this approach is that I'm looking for a standard FlatMap anonymous function that could return every time: 1. different number of elements within the Array and 2. the data type can be random likewise. I mean is not fixed the whole time then my TypeInformation return would fix every execution.

As far as I know, there is no such standard flatMap function. The table definition requires a fixed number of columns, and even if Flink can infer column types, it also requires that the column types are fixed. For the case you said, the number of columns in the table should be the possible maximum number of elements. If the number of elements is not enough, you should pad all columns defined by the table and then return.  For case where elements in the same column may have different types, you can convert them to a uniform column type defined by the table, or customize a type that can handle these different types of elements.



Best,
Haibo

At 2019-08-07 23:05:51, "Andres Angel" <in...@gmail.com> wrote:

Hello Victor ,


You are totally right , so now this turn into is Flink capable to handle these cases where would be required define the type info in the row and the Table will infer the columns separated by comma or something similar?


thanks
AU


On Wed, Aug 7, 2019 at 10:33 AM Victor Wong <ji...@outlook.com> wrote:


Hi Andres,

 

I’d like to share my thoughts:

When you register a “Table”, you need to specify its “schema”, so how can you register the table when the number of elements/columns and data types are both nondeterministic.

Correct me if I misunderstood your meaning.

 

Best,

Victor

 

From: Andres Angel <in...@gmail.com>
Date: Wednesday, August 7, 2019 at 9:55 PM
To: Haibo Sun <su...@163.com>
Cc: user <us...@flink.apache.org>
Subject: Re: FlatMap returning Row<> based on ArrayList elements()

 

Hello everyone, let me be more precis on what I'm looking for at the end because your example is right and very accurate in the way about how to turn an array into a Row() object.

I have done it seamlessly:

 

out.collect(Row.of(pelements.toArray()));

 

Then I printed and the outcome is as expected:

 

5d2df2c2e7370c7843dad9ca,359731,13333196,156789925,619381

 

Now I need to register this DS as a table and here is basically how I'm planning to do it:

 

tenv.registerDataStream("newtable", ds, 'col1,col2,col3,col4,col5');

 

However, this returns an error on the DS registration due to I need to specify the RowTypeInfo. Here is the big deal because yes I know I would be able to use something like :

 

 

TypeInformation<?>[] types= {

BasicTypeInfo.STRING_TYPE_INFO,

BasicTypeInfo.INT_TYPE_INFO,

BasicTypeInfo.INT_TYPE_INFO,

BasicTypeInfo.INT_TYPE_INFO,

BasicTypeInfo.INT_TYPE_INFO};

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

DataStream ds = previousds.flatMap(new FlatMapFunction<List<Integer>, Row>() {

@Override

public void flatMap(List<Integer> value, Collector<Row> out) throws Exception {

out.collect(Row.of(value.toArray(new Integer[0])));

}

}).return(types);

 

 

The problem with this approach is that I'm looking for a standard FlatMap anonymous function that could return every time: 1. different number of elements within the Array and 2. the data type can be random likewise. I mean is not fixed the whole time then my TypeInformation return would fix every execution.

 

How could I approach this?

 

thanks so much

AU

 

 

On Wed, Aug 7, 2019 at 3:57 AM Haibo Sun <su...@163.com> wrote:

Hi Andres Angel,

 

I guess people don't understand your problem (including me). I don't know if the following sample code is what you want, if not, can you describe the problem more clearly?

 

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.fromElements(Arrays.asList(1, 2, 3, 4, 5))

.flatMap(new FlatMapFunction<List<Integer>, Row>() {

@Override

public void flatMap(List<Integer> value, Collector<Row> out) throws Exception {

out.collect(Row.of(value.toArray(new Integer[0])));

}

}).print();

 

env.execute("test job");

 

Best,

Haibo


At 2019-07-30 02:30:27, "Andres Angel" <in...@gmail.com> wrote:



Hello everyone,

 

I need to parse into an anonymous function an input data to turn it into several Row elements. Originally I would have done something like Row.of(1,2,3,4) but these elements can change on the flight as part of my function. This is why I have decided to store them in a list and right now it looks something like this:

 

 

Now, I need to return my out Collector it Row<> based on this elements. I checked on the Flink documentation but the Lambda functions are not supported : https://ci.apache.org/projects/flink/flink-docs-stable/dev/java_lambdas.html , Then I though ok I can loop my ArrayList in a Tuple and pass this tuple as Row.of(myTuple):

 

                    Tuple mytuple = Tuple.newInstance(5);
                    for (int i = 0; i < pelements.size(); i++) {
                        mytuple.setField(pelements.get(i), i);
                    }
                    out.collect(Row.of(mytuple));

 

 

However , it doesnt work because this is being parsed s 1 element for  sqlQuery step. how could I do something like:

 

pelements.forEach(n->out.collect(Row.of(n)));

 

Thanks so much

Re: FlatMap returning Row<> based on ArrayList elements()

Posted by Andres Angel <in...@gmail.com>.
Hello Victor ,

You are totally right , so now this turn into is Flink capable to handle
these cases where would be required define the type info in the row and the
Table will infer the columns separated by comma or something similar?

thanks
AU

On Wed, Aug 7, 2019 at 10:33 AM Victor Wong <ji...@outlook.com>
wrote:

> Hi Andres,
>
>
>
> I’d like to share my thoughts:
>
> When you register a “Table”, you need to specify its “schema”, so how can
> you register the table when the number of elements/columns and data types
> are both nondeterministic.
>
> Correct me if I misunderstood your meaning.
>
>
>
> Best,
>
> Victor
>
>
>
> *From: *Andres Angel <in...@gmail.com>
> *Date: *Wednesday, August 7, 2019 at 9:55 PM
> *To: *Haibo Sun <su...@163.com>
> *Cc: *user <us...@flink.apache.org>
> *Subject: *Re: FlatMap returning Row<> based on ArrayList elements()
>
>
>
> Hello everyone, let me be more precis on what I'm looking for at the end
> because your example is right and very accurate in the way about how to
> turn an array into a Row() object.
>
> I have done it seamlessly:
>
>
>
> out.collect(Row.of(pelements.toArray()));
>
>
>
> Then I printed and the outcome is as expected:
>
>
>
> 5d2df2c2e7370c7843dad9ca,359731,13333196,156789925,619381
>
>
>
> Now I need to register this DS as a table and here is basically how I'm
> planning to do it:
>
>
>
> tenv.registerDataStream("newtable", ds, 'col1,col2,col3,col4,col5');
>
>
>
> However, this returns an error on the DS registration due to I need to
> specify the RowTypeInfo. Here is the big deal because yes I know I would be
> able to use something like :
>
>
>
>
>
> TypeInformation<?>[] types = {
>
> BasicTypeInfo.STRING_TYPE_INFO,
>
> BasicTypeInfo.INT_TYPE_INFO,
>
> BasicTypeInfo.INT_TYPE_INFO,
>
> BasicTypeInfo.INT_TYPE_INFO,
>
> BasicTypeInfo.INT_TYPE_INFO};
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> DataStream ds = previousds.flatMap(new FlatMapFunction<List<Integer>,
> Row>() {
>
> @Override
>
> public void flatMap(List<Integer> value, Collector<Row> out) throws
> Exception {
>
> out.collect(Row.of(value.toArray(new Integer[0])));
>
> }
>
> }).return(types);
>
>
>
>
>
> The problem with this approach is that I'm looking for a standard FlatMap
> anonymous function that could return every time: 1. different number of
> elements within the Array and 2. the data type can be random likewise. I
> mean is not fixed the whole time then my TypeInformation return would fix
> every execution.
>
>
>
> How could I approach this?
>
>
>
> thanks so much
>
> AU
>
>
>
>
>
> On Wed, Aug 7, 2019 at 3:57 AM Haibo Sun <su...@163.com> wrote:
>
> Hi Andres Angel,
>
>
>
> I guess people don't understand your problem (including me). I don't know
> if the following sample code is what you want, if not, can you describe the
> problem more clearly?
>
>
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> env.fromElements(Arrays.asList(1, 2, 3, 4, 5))
>
> .flatMap(new FlatMapFunction<List<Integer>, Row>() {
>
> @Override
>
> public void flatMap(List<Integer> value, Collector<Row> out) throws
> Exception {
>
> out.collect(Row.of(value.toArray(new Integer[0])));
>
> }
>
> }).print();
>
>
>
> env.execute("test job");
>
>
>
> Best,
>
> Haibo
>
>
> At 2019-07-30 02:30:27, "Andres Angel" <in...@gmail.com>
> wrote:
>
> Hello everyone,
>
>
>
> I need to parse into an anonymous function an input data to turn it into
> several Row elements. Originally I would have done something like
> Row.of(1,2,3,4) but these elements can change on the flight as part of my
> function. This is why I have decided to store them in a list and right now
> it looks something like this:
>
>
>
> [image: image.png]
>
>
>
> Now, I need to return my out Collector it Row<> based on this elements. I
> checked on the Flink documentation but the Lambda functions are not
> supported :
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/java_lambdas.html ,
> Then I though ok I can loop my ArrayList in a Tuple and pass this tuple as
> Row.of(myTuple):
>
>
>
>                     Tuple mytuple = Tuple.newInstance(5);
>                     for (int i = 0; i < pelements.size(); i++) {
>                         mytuple.setField(pelements.get(i), i);
>                     }
>                     out.collect(Row.of(mytuple));
>
>
>
>
>
> However , it doesnt work because this is being parsed s 1 element for
> sqlQuery step. how could I do something like:
>
>
>
> pelements.forEach(n->out.collect(Row.of(n)));
>
>
>
> Thanks so much
>
>

Re: FlatMap returning Row<> based on ArrayList elements()

Posted by Victor Wong <ji...@outlook.com>.
Hi Andres,



I’d like to share my thoughts:

When you register a “Table”, you need to specify its “schema”, so how can you register the table when the number of elements/columns and data types are both nondeterministic.

Correct me if I misunderstood your meaning.



Best,

Victor

From: Andres Angel <in...@gmail.com>
Date: Wednesday, August 7, 2019 at 9:55 PM
To: Haibo Sun <su...@163.com>
Cc: user <us...@flink.apache.org>
Subject: Re: FlatMap returning Row<> based on ArrayList elements()

Hello everyone, let me be more precis on what I'm looking for at the end because your example is right and very accurate in the way about how to turn an array into a Row() object.
I have done it seamlessly:

out.collect(Row.of(pelements.toArray()));

Then I printed and the outcome is as expected:

5d2df2c2e7370c7843dad9ca,359731,13333196,156789925,619381

Now I need to register this DS as a table and here is basically how I'm planning to do it:

tenv.registerDataStream("newtable", ds, 'col1,col2,col3,col4,col5');

However, this returns an error on the DS registration due to I need to specify the RowTypeInfo. Here is the big deal because yes I know I would be able to use something like :


TypeInformation<?>[] types = {
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO};

































DataStream ds = previousds.flatMap(new FlatMapFunction<List<Integer>, Row>() {
@Override
public void flatMap(List<Integer> value, Collector<Row> out) throws Exception {
out.collect(Row.of(value.toArray(new Integer[0])));
}
}).return(types);


The problem with this approach is that I'm looking for a standard FlatMap anonymous function that could return every time: 1. different number of elements within the Array and 2. the data type can be random likewise. I mean is not fixed the whole time then my TypeInformation return would fix every execution.

How could I approach this?

thanks so much
AU


On Wed, Aug 7, 2019 at 3:57 AM Haibo Sun <su...@163.com>> wrote:
Hi Andres Angel,

I guess people don't understand your problem (including me). I don't know if the following sample code is what you want, if not, can you describe the problem more clearly?

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromElements(Arrays.asList(1, 2, 3, 4, 5))
.flatMap(new FlatMapFunction<List<Integer>, Row>() {
@Override
public void flatMap(List<Integer> value, Collector<Row> out) throws Exception {
out.collect(Row.of(value.toArray(new Integer[0])));
}
}).print();

env.execute("test job");

Best,
Haibo

At 2019-07-30 02:30:27, "Andres Angel" <in...@gmail.com>> wrote:

Hello everyone,

I need to parse into an anonymous function an input data to turn it into several Row elements. Originally I would have done something like Row.of(1,2,3,4) but these elements can change on the flight as part of my function. This is why I have decided to store them in a list and right now it looks something like this:

[image.png]

Now, I need to return my out Collector it Row<> based on this elements. I checked on the Flink documentation but the Lambda functions are not supported : https://ci.apache.org/projects/flink/flink-docs-stable/dev/java_lambdas.html , Then I though ok I can loop my ArrayList in a Tuple and pass this tuple as Row.of(myTuple):

                    Tuple mytuple = Tuple.newInstance(5);
                    for (int i = 0; i < pelements.size(); i++) {
                        mytuple.setField(pelements.get(i), i);
                    }
                    out.collect(Row.of(mytuple));


However , it doesnt work because this is being parsed s 1 element for  sqlQuery step. how could I do something like:

pelements.forEach(n->out.collect(Row.of(n)));

Thanks so much

Re: FlatMap returning Row<> based on ArrayList elements()

Posted by Andres Angel <in...@gmail.com>.
Hello everyone, let me be more precis on what I'm looking for at the end
because your example is right and very accurate in the way about how to
turn an array into a Row() object.
I have done it seamlessly:

out.collect(Row.of(pelements.toArray()));

Then I printed and the outcome is as expected:

5d2df2c2e7370c7843dad9ca,359731,13333196,156789925,619381

Now I need to register this DS as a table and here is basically how I'm
planning to do it:

tenv.registerDataStream("newtable", ds, 'col1,col2,col3,col4,col5');

However, this returns an error on the DS registration due to I need to
specify the RowTypeInfo. Here is the big deal because yes I know I would be
able to use something like :


TypeInformation<?>[] types = {
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO};

































DataStream ds = previousds.flatMap(new FlatMapFunction<List<Integer>,
Row>() {
@Override
public void flatMap(List<Integer> value, Collector<Row> out) throws
Exception {
out.collect(Row.of(value.toArray(new Integer[0])));
}
}).return(types);


The problem with this approach is that I'm looking for a standard FlatMap
anonymous function that could return every time: 1. different number of
elements within the Array and 2. the data type can be random likewise. I
mean is not fixed the whole time then my TypeInformation return would fix
every execution.

How could I approach this?

thanks so much
AU


On Wed, Aug 7, 2019 at 3:57 AM Haibo Sun <su...@163.com> wrote:

> Hi Andres Angel,
>
> I guess people don't understand your problem (including me). I don't know
> if the following sample code is what you want, if not, can you describe the
> problem more clearly?
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.fromElements(Arrays.asList(1, 2, 3, 4, 5))
> .flatMap(new FlatMapFunction<List<Integer>, Row>() {
> @Override
> public void flatMap(List<Integer> value, Collector<Row> out) throws
> Exception {
> out.collect(Row.of(value.toArray(new Integer[0])));
> }
> }).print();
>
> env.execute("test job");
>
> Best,
> Haibo
>
> At 2019-07-30 02:30:27, "Andres Angel" <in...@gmail.com>
> wrote:
>
> Hello everyone,
>
> I need to parse into an anonymous function an input data to turn it into
> several Row elements. Originally I would have done something like
> Row.of(1,2,3,4) but these elements can change on the flight as part of my
> function. This is why I have decided to store them in a list and right now
> it looks something like this:
>
> [image: image.png]
>
> Now, I need to return my out Collector it Row<> based on this elements. I
> checked on the Flink documentation but the Lambda functions are not
> supported :
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/java_lambdas.html ,
> Then I though ok I can loop my ArrayList in a Tuple and pass this tuple as
> Row.of(myTuple):
>
>                     Tuple mytuple = Tuple.newInstance(5);
>                     for (int i = 0; i < pelements.size(); i++) {
>                         mytuple.setField(pelements.get(i), i);
>                     }
>                     out.collect(Row.of(mytuple));
>
>
> However , it doesnt work because this is being parsed s 1 element for
> sqlQuery step. how could I do something like:
>
> pelements.forEach(n->out.collect(Row.of(n)));
>
> Thanks so much
>
>

Re:FlatMap returning Row<> based on ArrayList elements()

Posted by Haibo Sun <su...@163.com>.
Hi Andres Angel,


I guess people don't understand your problem (including me). I don't know if the following sample code is what you want, if not, can you describe the problem more clearly?


StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromElements(Arrays.asList(1, 2, 3, 4, 5))
.flatMap(new FlatMapFunction<List<Integer>, Row>() {
@Override
public void flatMap(List<Integer> value, Collector<Row> out) throws Exception {
out.collect(Row.of(value.toArray(new Integer[0])));
}
}).print();


env.execute("test job");


Best,
Haibo

At 2019-07-30 02:30:27, "Andres Angel" <in...@gmail.com> wrote:

Hello everyone,



I need to parse into an anonymous function an input data to turn it into several Row elements. Originally I would have done something like Row.of(1,2,3,4) but these elements can change on the flight as part of my function. This is why I have decided to store them in a list and right now it looks something like this:






Now, I need to return my out Collector it Row<> based on this elements. I checked on the Flink documentation but the Lambda functions are not supported : https://ci.apache.org/projects/flink/flink-docs-stable/dev/java_lambdas.html , Then I though ok I can loop my ArrayList in a Tuple and pass this tuple as Row.of(myTuple):


                    Tuple mytuple = Tuple.newInstance(5);
                    for (int i = 0; i < pelements.size(); i++) {
                        mytuple.setField(pelements.get(i), i);
                    }
                    out.collect(Row.of(mytuple));





However , it doesnt work because this is being parsed s 1 element for  sqlQuery step. how could I do something like:


pelements.forEach(n->out.collect(Row.of(n)));



Thanks so much