You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by AndreaKinn <ki...@hotmail.it> on 2017/09/02 17:23:21 UTC

How to fill flink's datastream

Hi, 
Excuse me for the unclear title but I don't know how to summarise the
question.
I'm using an external library integrated with Flink called Flink-HTM. It is
still a prototype.
Internally, it performs everything I want but I have a problem returning
evaluated values in a  printable datastream.
I posted here my question because I believe the problem is tied with Flink
and not with the library.

Essentially I have the following code in my main:

*/DataStream<Double> result = HTM.learn(kafkaStream, new
Harness.AnomalyNetwork())
				.select(new InferenceSelectFunction<Harness.KafkaRecord, Double>() {
                  @Override
                    public Double select(Tuple2<Harness.KafkaRecord,
NetworkInference> inference) throws Exception {
						return inference.f1.getAnomalyScore();
                    }
				});/*

Then I want to print the datastream "result".
Following the /learn/ method the flink-htm lib correctly performs many
operations on data. 
At the end of this computation, in another class I have a /DataStream<T,
NetworkInference>/ and essentially I have to call the overridden "/select/"
method on that/ Datastream<T,NetworkInference>/.

The code which would do that is:

*/final DataStream<Tuple2&lt;T, NetworkInference>> inferenceStream =
inferenceStreamBuilder.build();
                 
           return inferenceStream
                .map(new InferenceSelectMapper<T,
R>(clean(inferenceSelectFunction)))
                .returns(returnType);    / 
*
where /map/ and /returns/ methods are described in Flink's
/DataStream.class./

*/public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) {

		TypeInformation<R> outType =
TypeExtractor.getMapReturnTypes(clean(mapper), getType(),
				Utils.getCallLocationName(), true);

		return transform("Map", outType, new StreamMap<>(clean(mapper)));
	}/*

*/public SingleOutputStreamOperator<T> returns(TypeInformation<T> typeInfo)
{
		requireNonNull(typeInfo, "TypeInformation must not be null");
		
		transformation.setOutputType(typeInfo);
		return this;
	}/*

while /InferenceSelectMapper<T,R>/ is the following class:

*/private static class InferenceSelectMapper<T, R> implements
MapFunction<Tuple2&lt;T, NetworkInference>, R> {

	private final InferenceSelectFunction<T, R> inferenceSelectFunction;

        public InferenceSelectMapper(InferenceSelectFunction<T, R>
inferenceSelectFunction) {
        		this.inferenceSelectFunction = inferenceSelectFunction;
         }

        @Override
        public R map(Tuple2<T, NetworkInference> value) throws Exception {
        		return inferenceSelectFunction.select(value);
        }
    }/*

which implements Flink's /MapFunction/. I absolutely need the program call
the /InferenceSelectMapper.map()/ method to call my defined "/select/"
function, unfortunately this doesn't happen. As consequence of that, in main
method and in the IDE console, I suppose the /DataStream result/ is not
filled and none output is printed, which is the my fundamental problem.

Since I'm not a Flink expert I don't know how to perform many operations at
"lower level".
Honestly I don't understand exactly what /map/ and /returns/ methods of
/DataStream.class/ do. I thought a lot about it and I also tried to find a
way to call /InferenceSelectMapper.map()/ method but I don't know how to
extract the /Tuple2<T, NetworkInference>/ from the
/DataStream<Tuple2&lt;...>>/.

I'm absolutely sure that the /map/ function I need in
/InferenceSelectMethod/ is not called because it doesn't appear in call
hierarchy and also adding a print instruction that is not showed. 

Please, can you help me to solve this? I've been stuck on it for a week
while the lib's owner doesn't reply to my mails. 
Sorry for the length.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: How to fill flink's datastream

Posted by AndreaKinn <ki...@hotmail.it>.
Hi,
I tried to remove the returns function but if I do it, the program returns
an error (curious since the return value is a Double).

I'm absolutely sure env.execute() is called because I see other streams
printed.

The program is connected, I followed exactly the example showed in the
library, I think that's a bug and I need to solve it. 

The inference stream where is called the map function is surely full.

Thank you


Fabian Hueske-2 wrote
> Hi Andrea,
> 
> a MapFunction calls its map() function for each stream element and returns
> exactly one result value.
> MapFunctions are used for 1-to-1 transformations.
> The returns() method allows to specify the return type of an operator, in
> your case the MapOperator. It is only necessary if Flink cannot
> automatically determine the return type of an operator.
> 
> It's not easy to identify what is going on from the code you posted.
> Are you sure the program is executed, i.e., did you call env.execute()?
> Are all parts of the program connected?
> Are you sure that the input stream of the Map operator emits records?
> 
> Best, Fabian
> 
> 
> 2017-09-02 19:23 GMT+02:00 AndreaKinn &lt;

> kinn6aer@

> &gt;:
> 
>> Hi,
>> Excuse me for the unclear title but I don't know how to summarise the
>> question.
>> I'm using an external library integrated with Flink called Flink-HTM. It
>> is
>> still a prototype.
>> Internally, it performs everything I want but I have a problem returning
>> evaluated values in a  printable datastream.
>> I posted here my question because I believe the problem is tied with
>> Flink
>> and not with the library.
>>
>> Essentially I have the following code in my main:
>>
>> */DataStream
> <Double>
>  result = HTM.learn(kafkaStream, new
>> Harness.AnomalyNetwork())
>>                                 .select(new
>> InferenceSelectFunction&lt;Harness.KafkaRecord,
> &gt; Double>() {
>>                   @Override
>>                     public Double select(Tuple2&lt;Harness.KafkaRecord,
> &gt; NetworkInference> inference) throws Exception {
>>                                                 return
>> inference.f1.getAnomalyScore();
>>                     }
>>                                 });/*
>>
>> Then I want to print the datastream "result".
>> Following the /learn/ method the flink-htm lib correctly performs many
>> operations on data.
>> At the end of this computation, in another class I have a
>> /DataStream&lt;T,
> &gt; NetworkInference>/ and essentially I have to call the overridden
> "/select/"
>> method on that/ Datastream&lt;T,NetworkInference&gt;/.
>>
>> The code which would do that is:
>>
>> */final DataStream&lt;Tuple2&amp;lt;T, NetworkInference&gt;>
>> inferenceStream =
>> inferenceStreamBuilder.build();
>>
>>            return inferenceStream
>>                 .map(new InferenceSelectMapper&lt;T,
> &gt; R>(clean(inferenceSelectFunction)))
>>                 .returns(returnType);    /
>> *
>> where /map/ and /returns/ methods are described in Flink's
>> /DataStream.class./
>>
>> */public 
> <R>
>  SingleOutputStreamOperator
> <R>
>  map(MapFunction&lt;T, R&gt; mapper) {
>>
>>                 TypeInformation
> <R>
>  outType =
>> TypeExtractor.getMapReturnTypes(clean(mapper), getType(),
>>                                 Utils.getCallLocationName(), true);
>>
>>                 return transform("Map", outType, new
>> StreamMap<>(clean(mapper)));
>>         }/*
>>
>> */public SingleOutputStreamOperator
> <T>
>  returns(TypeInformation
> <T>
>  typeInfo)
>> {
>>                 requireNonNull(typeInfo, "TypeInformation must not be
>> null");
>>
>>                 transformation.setOutputType(typeInfo);
>>                 return this;
>>         }/*
>>
>> while /InferenceSelectMapper&lt;T,R&gt;/ is the following class:
>>
>> */private static class InferenceSelectMapper&lt;T, R&gt; implements
>> MapFunction&lt;Tuple2&amp;lt;T, NetworkInference&gt;, R> {
>>
>>         private final InferenceSelectFunction&lt;T, R&gt;
>> inferenceSelectFunction;
>>
>>         public InferenceSelectMapper(InferenceSelectFunction&lt;T, R&gt;
>> inferenceSelectFunction) {
>>                         this.inferenceSelectFunction =
>> inferenceSelectFunction;
>>          }
>>
>>         @Override
>>         public R map(Tuple2&lt;T, NetworkInference&gt; value) throws
>> Exception {
>>                         return inferenceSelectFunction.select(value);
>>         }
>>     }/*
>>
>> which implements Flink's /MapFunction/. I absolutely need the program
>> call
>> the /InferenceSelectMapper.map()/ method to call my defined "/select/"
>> function, unfortunately this doesn't happen. As consequence of that, in
>> main
>> method and in the IDE console, I suppose the /DataStream result/ is not
>> filled and none output is printed, which is the my fundamental problem.
>>
>> Since I'm not a Flink expert I don't know how to perform many operations
>> at
>> "lower level".
>> Honestly I don't understand exactly what /map/ and /returns/ methods of
>> /DataStream.class/ do. I thought a lot about it and I also tried to find
>> a
>> way to call /InferenceSelectMapper.map()/ method but I don't know how to
>> extract the /Tuple2&lt;T, NetworkInference&gt;/ from the
>> /DataStream&lt;Tuple2&amp;lt;...&gt;>/.
>>
>> I'm absolutely sure that the /map/ function I need in
>> /InferenceSelectMethod/ is not called because it doesn't appear in call
>> hierarchy and also adding a print instruction that is not showed.
>>
>> Please, can you help me to solve this? I've been stuck on it for a week
>> while the lib's owner doesn't reply to my mails.
>> Sorry for the length.
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
>> n4.nabble.com/
>>





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: How to fill flink's datastream

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Andrea,

a MapFunction calls its map() function for each stream element and returns
exactly one result value.
MapFunctions are used for 1-to-1 transformations.
The returns() method allows to specify the return type of an operator, in
your case the MapOperator. It is only necessary if Flink cannot
automatically determine the return type of an operator.

It's not easy to identify what is going on from the code you posted.
Are you sure the program is executed, i.e., did you call env.execute()?
Are all parts of the program connected?
Are you sure that the input stream of the Map operator emits records?

Best, Fabian


2017-09-02 19:23 GMT+02:00 AndreaKinn <ki...@hotmail.it>:

> Hi,
> Excuse me for the unclear title but I don't know how to summarise the
> question.
> I'm using an external library integrated with Flink called Flink-HTM. It is
> still a prototype.
> Internally, it performs everything I want but I have a problem returning
> evaluated values in a  printable datastream.
> I posted here my question because I believe the problem is tied with Flink
> and not with the library.
>
> Essentially I have the following code in my main:
>
> */DataStream<Double> result = HTM.learn(kafkaStream, new
> Harness.AnomalyNetwork())
>                                 .select(new InferenceSelectFunction<Harness.KafkaRecord,
> Double>() {
>                   @Override
>                     public Double select(Tuple2<Harness.KafkaRecord,
> NetworkInference> inference) throws Exception {
>                                                 return
> inference.f1.getAnomalyScore();
>                     }
>                                 });/*
>
> Then I want to print the datastream "result".
> Following the /learn/ method the flink-htm lib correctly performs many
> operations on data.
> At the end of this computation, in another class I have a /DataStream<T,
> NetworkInference>/ and essentially I have to call the overridden "/select/"
> method on that/ Datastream<T,NetworkInference>/.
>
> The code which would do that is:
>
> */final DataStream<Tuple2&lt;T, NetworkInference>> inferenceStream =
> inferenceStreamBuilder.build();
>
>            return inferenceStream
>                 .map(new InferenceSelectMapper<T,
> R>(clean(inferenceSelectFunction)))
>                 .returns(returnType);    /
> *
> where /map/ and /returns/ methods are described in Flink's
> /DataStream.class./
>
> */public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) {
>
>                 TypeInformation<R> outType =
> TypeExtractor.getMapReturnTypes(clean(mapper), getType(),
>                                 Utils.getCallLocationName(), true);
>
>                 return transform("Map", outType, new
> StreamMap<>(clean(mapper)));
>         }/*
>
> */public SingleOutputStreamOperator<T> returns(TypeInformation<T> typeInfo)
> {
>                 requireNonNull(typeInfo, "TypeInformation must not be
> null");
>
>                 transformation.setOutputType(typeInfo);
>                 return this;
>         }/*
>
> while /InferenceSelectMapper<T,R>/ is the following class:
>
> */private static class InferenceSelectMapper<T, R> implements
> MapFunction<Tuple2&lt;T, NetworkInference>, R> {
>
>         private final InferenceSelectFunction<T, R>
> inferenceSelectFunction;
>
>         public InferenceSelectMapper(InferenceSelectFunction<T, R>
> inferenceSelectFunction) {
>                         this.inferenceSelectFunction =
> inferenceSelectFunction;
>          }
>
>         @Override
>         public R map(Tuple2<T, NetworkInference> value) throws Exception {
>                         return inferenceSelectFunction.select(value);
>         }
>     }/*
>
> which implements Flink's /MapFunction/. I absolutely need the program call
> the /InferenceSelectMapper.map()/ method to call my defined "/select/"
> function, unfortunately this doesn't happen. As consequence of that, in
> main
> method and in the IDE console, I suppose the /DataStream result/ is not
> filled and none output is printed, which is the my fundamental problem.
>
> Since I'm not a Flink expert I don't know how to perform many operations at
> "lower level".
> Honestly I don't understand exactly what /map/ and /returns/ methods of
> /DataStream.class/ do. I thought a lot about it and I also tried to find a
> way to call /InferenceSelectMapper.map()/ method but I don't know how to
> extract the /Tuple2<T, NetworkInference>/ from the
> /DataStream<Tuple2&lt;...>>/.
>
> I'm absolutely sure that the /map/ function I need in
> /InferenceSelectMethod/ is not called because it doesn't appear in call
> hierarchy and also adding a print instruction that is not showed.
>
> Please, can you help me to solve this? I've been stuck on it for a week
> while the lib's owner doesn't reply to my mails.
> Sorry for the length.
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>