You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by AndreaKinn <ki...@hotmail.it> on 2017/10/25 16:27:10 UTC

StreamTransformation object

Hi,
I'm using an external library with Flink I'm trying to implement
slotSharingGroup(String) method on it.
To do it I looked at SingleOutputStreamOperator Flink's class to see how the
method slotSharingGroup(String) is implemented.

An abstract:

/public class SingleOutputStreamOperator<T> extends DataStream<T> {

SingleOutputStreamOperator(StreamExecutionEnvironment environment,
StreamTransformation<T> transformation) {
		super(environment, transformation);
	}

SingleOutputStreamOperator<T> slotSharingGroup(String slotSharingGroup) {
		transformation.setSlotSharingGroup(slotSharingGroup);
		return this;
	}
}/

so I changed the constructor of external library class which has to offer
the slotSharingGroup() method making it more adherent to
SingleOutputStreamOperator template.

Now my problem is how to call it (see below) because I don't understand what
is StreamTransformation<T> object among the parameters of the constructor of
SingleOutputStreamOperator and how to obtain it in main class.

Following the method I call:

/DataStream<Tuple7&lt;String, String, Date, String, String, Double, Double>>
LCxAccResult = HTM.learn(env, */* what STREAMTRANSFORMATION<T> here?
*/*).slotSharingGroup("group");
/

Hope you can help me, thanks in advance
Andrea



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: StreamTransformation object

Posted by AndreaKinn <ki...@hotmail.it>.
Thanks for your help, I solved the issue refactoring HTMStream adding new
api's



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: StreamTransformation object

Posted by Tony Wei <to...@gmail.com>.
Hi Andrea,

I don't think you need to touch `StreamTransformation`. If you can get the
result from build(), you can do the same thing I mentioned above: casting
it as SingleOutputStreamOperator.
Then, you can pass it to select function to add the next operator, and get
the result to add another slotSharing group.

Best Regards,
Tony Wei

2017-10-27 17:18 GMT+08:00 AndreaKinn <ki...@hotmail.it>:

> I'm trying to create an API to get results from build() but anyway I'm
> stuck
> on the StreamTransformation which is essential to implement
> slotSharingGroup(...). I have to provide it from the main class.
>
>
> Tony Wei wrote
> > Hi Andrea,
> >
> > The `learn` operator is defined in this method [1]. If you need to set
> its
> > slotSharing group, you should add `slotSharingGroup(...)` behind line 97
> > [2] or a new API to get the result from `inferenceStreamBuilder.build(
> )`.
> >
> > Best Regards,
> > Tony Wei
> >
> > [1]
> > https://github.com/htm-community/flink-htm/blob/
> master/flink-htm-streaming-java/src/main/java/org/
> numenta/nupic/flink/streaming/api/HTMStream.java#L148
> > [2]
> > https://github.com/htm-community/flink-htm/blob/
> master/flink-htm-streaming-java/src/main/java/org/
> numenta/nupic/flink/streaming/api/HTMStream.java#L97
> >
> > 2017-10-26 17:36 GMT+08:00 AndreaKinn &lt;
>
> > kinn6aer@
>
> > &gt;:
> >
> >> Can you be clearer about this part?
> >>
> >> I'm really appreciating your help
> >>
> >>
> >> Tony Wei wrote
> >> > you need to refactor `HTMStream` to expose
> >> > `InferenceStreamBuilder.build()`.
> >>
> >>
> >>
> >>
> >>
> >> --
> >> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> >> n4.nabble.com/
> >>
>
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>

Re: StreamTransformation object

Posted by AndreaKinn <ki...@hotmail.it>.
I'm trying to create an API to get results from build() but anyway I'm stuck
on the StreamTransformation which is essential to implement
slotSharingGroup(...). I have to provide it from the main class.


Tony Wei wrote
> Hi Andrea,
> 
> The `learn` operator is defined in this method [1]. If you need to set its
> slotSharing group, you should add `slotSharingGroup(...)` behind line 97
> [2] or a new API to get the result from `inferenceStreamBuilder.build()`.
> 
> Best Regards,
> Tony Wei
> 
> [1]
> https://github.com/htm-community/flink-htm/blob/master/flink-htm-streaming-java/src/main/java/org/numenta/nupic/flink/streaming/api/HTMStream.java#L148
> [2]
> https://github.com/htm-community/flink-htm/blob/master/flink-htm-streaming-java/src/main/java/org/numenta/nupic/flink/streaming/api/HTMStream.java#L97
> 
> 2017-10-26 17:36 GMT+08:00 AndreaKinn &lt;

> kinn6aer@

> &gt;:
> 
>> Can you be clearer about this part?
>>
>> I'm really appreciating your help
>>
>>
>> Tony Wei wrote
>> > you need to refactor `HTMStream` to expose
>> > `InferenceStreamBuilder.build()`.
>>
>>
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
>> n4.nabble.com/
>>





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: StreamTransformation object

Posted by Tony Wei <to...@gmail.com>.
Hi Andrea,

The `learn` operator is defined in this method [1]. If you need to set its
slotSharing group, you should add `slotSharingGroup(...)` behind line 97
[2] or a new API to get the result from `inferenceStreamBuilder.build()`.

Best Regards,
Tony Wei

[1]
https://github.com/htm-community/flink-htm/blob/master/flink-htm-streaming-java/src/main/java/org/numenta/nupic/flink/streaming/api/HTMStream.java#L148
[2]
https://github.com/htm-community/flink-htm/blob/master/flink-htm-streaming-java/src/main/java/org/numenta/nupic/flink/streaming/api/HTMStream.java#L97

2017-10-26 17:36 GMT+08:00 AndreaKinn <ki...@hotmail.it>:

> Can you be clearer about this part?
>
> I'm really appreciating your help
>
>
> Tony Wei wrote
> > you need to refactor `HTMStream` to expose
> > `InferenceStreamBuilder.build()`.
>
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>

Re: StreamTransformation object

Posted by AndreaKinn <ki...@hotmail.it>.
Can you be clearer about this part?

I'm really appreciating your help


Tony Wei wrote
> you need to refactor `HTMStream` to expose
> `InferenceStreamBuilder.build()`.





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: StreamTransformation object

Posted by Tony Wei <to...@gmail.com>.
Hi Andrea,

In this way, you will only set a slotSharing group on select operator and
learn operator will remain in the default group.
If you want to set lean operator as well, I am afraid that you need to
refactor `HTMStream` to expose `InferenceStreamBuilder.build()`.

Best Regards,
Tony Wei

2017-10-26 17:01 GMT+08:00 AndreaKinn <ki...@hotmail.it>:

> Mmm looks good. This solution would be great.
> In this way am I setting a slotSharing group for both learn and select
> method and not only on select?
> I believed I need to call slotSharingGroup exactly on the return type of
> learn.
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>

Re: StreamTransformation object

Posted by AndreaKinn <ki...@hotmail.it>.
Mmm looks good. This solution would be great.
In this way am I setting a slotSharing group for both learn and select
method and not only on select? 
I believed I need to call slotSharingGroup exactly on the return type of
learn.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: StreamTransformation object

Posted by Tony Wei <to...@gmail.com>.
Hi Andrea,

I roughly read that external library[1], and I think the return object of
"select" function could be casted as `SingleOutputStreamOperator` type [2].
How about trying the following code?

DataStream<Tuple7<String, String, Date, String, String, Double, Double>>
LCxAccResult = HTM.learn(LCxAccStream, new Harness.AnomalyNetwork())
      .select(new InferenceSelectFunction<Harness.KafkaRecord,
Tuple7<String, String, Date, String, String, Double, Double>>() {...};
((SingleOutputStreamOperator) LCxAccResult).slotSharingGroup("...");

Best Regards,
Tony Wei

[1]
https://github.com/htm-community/flink-htm/blob/master/flink-htm-streaming-java/src/main/java/org/numenta/nupic/flink/streaming/api/HTMStream.java#L99
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.html#returns-org.apache.flink.api.common.typeinfo.TypeInformation-

2017-10-26 16:31 GMT+08:00 AndreaKinn <ki...@hotmail.it>:

> Sorry Tony it is my fault, I was wrong the first post. Actually now my
> situation is the following:
>
>
> DataStream<Tuple7&lt;String, String, Date, String, String, Double, Double>>
> LCxAccResult = HTM.learn(LCxAccStream, new Harness.AnomalyNetwork())
>                                 .select(new InferenceSelectFunction<
> Harness.KafkaRecord,
> Tuple7&lt;String, String, Date, String, String, Double, Double>>() {...}
>
>
> so actually the return value of "Learn" is a HTMStream object and the
> return
> value of "Select" is a DataStream where I need to implement
> slotSharingGroup
> on Learn. So I think I can't set SingleOutputStreamOperator as return value
> of learn, I believe (I hope not since I have not a clue how to do it :D) I
> need to define slotSharingGroup directly in HTMStream class, as in the
> first
> post.
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>

Re: StreamTransformation object

Posted by AndreaKinn <ki...@hotmail.it>.
Sorry Tony it is my fault, I was wrong the first post. Actually now my
situation is the following:


DataStream<Tuple7&lt;String, String, Date, String, String, Double, Double>>
LCxAccResult = HTM.learn(LCxAccStream, new Harness.AnomalyNetwork())
				.select(new InferenceSelectFunction<Harness.KafkaRecord,
Tuple7&lt;String, String, Date, String, String, Double, Double>>() {...}


so actually the return value of "Learn" is a HTMStream object and the return
value of "Select" is a DataStream where I need to implement slotSharingGroup
on Learn. So I think I can't set SingleOutputStreamOperator as return value
of learn, I believe (I hope not since I have not a clue how to do it :D) I
need to define slotSharingGroup directly in HTMStream class, as in the first
post. 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: StreamTransformation object

Posted by Tony Wei <to...@gmail.com>.
Hi Andrea,

How about return `SingleOutputStreamOperator` when you called
`HTM.learn()`, instead of create a new method in the external library.
Since I guessed it called the API of Flink inner that function and the
transformation in Flink, such as map, is actually return
`SingleOutputStreamOperator` [1], I think it is easier to just change the
return type of that function.
And you can leverage the functionality of the `SingleOutputStreamOperator`.
Hope this will help you.

Best Regards,
Tony Wei

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/api/datastream/DataStream.html#map-org.apache.flink.api.common.functions.MapFunction-

2017-10-26 0:27 GMT+08:00 AndreaKinn <ki...@hotmail.it>:

> Hi,
> I'm using an external library with Flink I'm trying to implement
> slotSharingGroup(String) method on it.
> To do it I looked at SingleOutputStreamOperator Flink's class to see how
> the
> method slotSharingGroup(String) is implemented.
>
> An abstract:
>
> /public class SingleOutputStreamOperator<T> extends DataStream<T> {
>
> SingleOutputStreamOperator(StreamExecutionEnvironment environment,
> StreamTransformation<T> transformation) {
>                 super(environment, transformation);
>         }
>
> SingleOutputStreamOperator<T> slotSharingGroup(String slotSharingGroup) {
>                 transformation.setSlotSharingGroup(slotSharingGroup);
>                 return this;
>         }
> }/
>
> so I changed the constructor of external library class which has to offer
> the slotSharingGroup() method making it more adherent to
> SingleOutputStreamOperator template.
>
> Now my problem is how to call it (see below) because I don't understand
> what
> is StreamTransformation<T> object among the parameters of the constructor
> of
> SingleOutputStreamOperator and how to obtain it in main class.
>
> Following the method I call:
>
> /DataStream<Tuple7&lt;String, String, Date, String, String, Double,
> Double>>
> LCxAccResult = HTM.learn(env, */* what STREAMTRANSFORMATION<T> here?
> */*).slotSharingGroup("group");
> /
>
> Hope you can help me, thanks in advance
> Andrea
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>