You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Philipp Bussche <ph...@gmail.com> on 2016/10/14 22:07:14 UTC

Task and Operator Monitoring via JMX / naming

Hi there,
I am struggeling to understand what I am looking at after enabling JMX
metric reporting on my taskmanager.
The job I am trying this out with has 1 source, 2 map functions (where one
is a RichMap) and 3 sinks.
This is how I have written my Job:

DataStream<Invitation> invitations = streaming
					.addSource(new FlinkKafkaConsumer09<>(
							dsp.getPropertyAsString("kafka.invitation.topic"),
							new InvitationSchema(),
							kafkaProps)).name("KafkaSource");
invitations.addSink(new
PostgresqlInvitationDetailsSink<>(psqlConfig)).name("InvitationDetailSink");

DataStream<Tuple2&lt;String, String>> tokens = invitations
				.map(new TokenExtractor()).name("TokenMapStream");
tokens.addSink(new
PostgresqlInvitationTokenSink<>(psqlConfig)).name("InvitationTokenSink");

DataStream<Tuple4&lt;String, String, String, String>> invitationResponses =
invitations
				.map(new InvitationDetailsExtractor(psqlConfig,
tokenToSenderMapping)).name("InvitationDetailsRichMapStream");
invitationResponses.addSink(new
Neo4JInvitationSink<>(neo4jConfig)).name("InvitationRelationshipSink");

streaming.execute("InvitationJob");

Somehow I was expecting to have metrics representing the source, the sinks
and the operators, however instead of 6 entries in my JMX tree I only have
4. Please see screenshot attached. Also I was somehow expecting the JMX
objects to be named like my task / operator names but it has all sorts of
prefix/suffix magic around the names. Finally I have one custom metric which
obviously is attached to my RichMapFunction (InvitationDetailsExtractor).
However the custom metric (invitationDetailsAdded) shows up under an object
where one of the keys (which I would expect to be set to the operation name)
is a combination of the prefix "Sink" plus the name of the first sink that I
am using plus the name of the first map function (which is not the
RichMapFunction actually: my custom metric "invitationDetailsAdded" shows up
under "(Sink-_InvitationDetailSink-_TokenMapStream" which is very confusing
because this metric is actually incremented as part of the
InvitationDetailsRichMapStream). 

Can somebody please explain what I can expect from metrics exposed via JMX
(should they really represent my tasks and operations) and why the naming is
so strange ?

Thanks
Philipp

<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n9560/Screen_Shot_2016-10-15_at_00.png> 



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Task-and-Operator-Monitoring-via-JMX-naming-tp9560.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Task and Operator Monitoring via JMX / naming

Posted by Chesnay Schepler <ch...@apache.org>.
It will be in the master tomorrow.

On 20.10.2016 18:50, Philipp Bussche wrote:
> Thanks Chesnay !
>
> I am not too familiar with the release cycles here but was wondering when
> one could expect your fix to be in the master of Flink ? Should I create my
> own build for the moment maybe ?
>
> Thanks.
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Task-and-Operator-Monitoring-via-JMX-naming-tp9560p9662.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
>


Re: Task and Operator Monitoring via JMX / naming

Posted by Philipp Bussche <ph...@gmail.com>.
Thanks Chesnay !

I am not too familiar with the release cycles here but was wondering when
one could expect your fix to be in the master of Flink ? Should I create my
own build for the moment maybe ?

Thanks.



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Task-and-Operator-Monitoring-via-JMX-naming-tp9560p9662.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Task and Operator Monitoring via JMX / naming

Posted by Chesnay Schepler <ch...@apache.org>.
Well the issue is the following:

the metric system assumes the following naming scheme for tasks based on 
the DataSet API and simple streaming jobs: [CHAIN] operatorName1 [=> 
operatorName2 [ ...]]
To retrieve the operator name the above is split by "=>", giving us a 
String[] of all operator names in a task, from which we then select the 
correct one based on the position in the chain.

However, the Stremaing API has some fancy chaining stuff going on, where 
multiple operations can be chained to a single one which results in a 
name like this: operatorName1 => (operatorName2, operatorName3)

For both op2 and op3 the chain index is identical (since for a tree 
structure the index is the depth), resulting in both picking 
(operatorName2, operatorName3) as their name, which is obviously wrong.

The solution (which i already implemented, sorry for that) is to simply 
stop inferring the operator names from the task (it was hacky to being 
with) and just encode them in the configuration for the operator.
This can be seen here: 
https://github.com/zentol/flink/commit/7f439525a26504e98b72f2d39b987ac878464419

Regards,
Chesnay

On 20.10.2016 14:21, Philipp Bussche wrote:
> Thanks Chesnay,
> I am happy to share more around my environment and do additional testing for
> this.
> Also I would be happy to help fixing if we see there might be an issue in
> the code somewhere.
> In fact I am still trying to get a Hacktoberfest T-Shirt and I am still pull
> requests short  ;)
>
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Task-and-Operator-Monitoring-via-JMX-naming-tp9560p9650.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
>


Re: Task and Operator Monitoring via JMX / naming

Posted by Philipp Bussche <ph...@gmail.com>.
Thanks Chesnay,
I am happy to share more around my environment and do additional testing for
this.
Also I would be happy to help fixing if we see there might be an issue in
the code somewhere.
In fact I am still trying to get a Hacktoberfest T-Shirt and I am still pull
requests short  ;)




--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Task-and-Operator-Monitoring-via-JMX-naming-tp9560p9650.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Task and Operator Monitoring via JMX / naming

Posted by Chesnay Schepler <ch...@apache.org>.
This is completely unintended behavior; you should never have to adjust 
your topology so the metric system get's the names right.

I'll take a deep look into this tomorrow ;)

Regards,
Chesnay

On 20.10.2016 08:50, Philipp Bussche wrote:
> Some further observations: I had a Job which was taking events of a Kafka
> topic and sending it to two sinks whereas for one of them a Map operation
> would happen first. When creating one event stream and sending it to the two
> sinks the JMX representation was not showing both sinks and the naming of
> the Map operation was also not right. But when creating two event streams in
> the job (basically two Kafka consumers doing the exact same) and then
> sending each to one sink the naming changed and seem to look like what I
> would expect.
> A question remains though if it is best practise anyways to do one thing
> with a Job only (like one map operation and one distribution to a sink) and
> hence having multiple streams is the way to go or if this is still
> unexpected behaviour what I see in my environment and should be fixed ?
> Thanks
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Task-and-Operator-Monitoring-via-JMX-naming-tp9560p9642.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
>


Re: Task and Operator Monitoring via JMX / naming

Posted by Philipp Bussche <ph...@gmail.com>.
Some further observations: I had a Job which was taking events of a Kafka
topic and sending it to two sinks whereas for one of them a Map operation
would happen first. When creating one event stream and sending it to the two
sinks the JMX representation was not showing both sinks and the naming of
the Map operation was also not right. But when creating two event streams in
the job (basically two Kafka consumers doing the exact same) and then
sending each to one sink the naming changed and seem to look like what I
would expect.
A question remains though if it is best practise anyways to do one thing
with a Job only (like one map operation and one distribution to a sink) and
hence having multiple streams is the way to go or if this is still
unexpected behaviour what I see in my environment and should be fixed ?
Thanks 



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Task-and-Operator-Monitoring-via-JMX-naming-tp9560p9642.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Task and Operator Monitoring via JMX / naming

Posted by Philipp Bussche <ph...@gmail.com>.
Thanks Chesnay.

I had a look at how the JMX representation looks like when I look at a Task
Manager which has one of the example Jobs deployed
(https://ci.apache.org/projects/flink/flink-docs-release-1.1/quickstart/run_example_quickstart.html)
and this looks correct.
I assume at this point that the naming gets confused because I am having
multiple sinks in my Job and more than one operator on the same stream.
Maybe this is not expected and I should only have one operator and one sink
per Job ? However the job itself does what it is supposed to so I would only
change this for the monitoring as it stands right now.
Also it seems to make a difference when things are happening in the job.
I had a print (sink) of the wikipedia source stream right at after the
source is read and after moving this print statement to the very end of the
job class the representation in JMX changes. I would expect the naming of
sinks and operators to be always the same regardless of when they happen, no
?

Thanks
Philipp



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Task-and-Operator-Monitoring-via-JMX-naming-tp9560p9600.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Task and Operator Monitoring via JMX / naming

Posted by Chesnay Schepler <ch...@apache.org>.
Hello Philipp,

the relevant names are stored in the OperatorMetricGroup/TaskMetricGroup 
classes in flink-runtime.

The name for a task is extracted directly from the 
TaskDeploymentDescriptor in TaskManagerJobMetricGroup#addTask().
The name for a streaming operator that the metric system uses is set in 
AbstractStreamOperator#setup() and is derived
from the task name.

Regards,
Chesnay

On 15.10.2016 10:08, Philipp Bussche wrote:
> Thanks Chesnay, this is on Flink 1.1.3
> Please also note that e.g. the first item in the list which has the custom
> metric attached to it starts with a leading "(". It might be that the
> parsing of the names is not working quite as expected.
> I was trying to find out where these names come from but wasn't able to
> identify it in the source. If you know and want to give me a hint I can also
> do some more debugging !
> Thanks
> Philipp
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Task-and-Operator-Monitoring-via-JMX-naming-tp9560p9564.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
>


Re: Task and Operator Monitoring via JMX / naming

Posted by Philipp Bussche <ph...@gmail.com>.
Thanks Chesnay, this is on Flink 1.1.3
Please also note that e.g. the first item in the list which has the custom
metric attached to it starts with a leading "(". It might be that the
parsing of the names is not working quite as expected.
I was trying to find out where these names come from but wasn't able to
identify it in the source. If you know and want to give me a hint I can also
do some more debugging !
Thanks
Philipp



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Task-and-Operator-Monitoring-via-JMX-naming-tp9560p9564.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Task and Operator Monitoring via JMX / naming

Posted by Chesnay Schepler <ch...@apache.org>.
Hello Philipp,

there is certainly something very wrong here.

What you _should_ see is 6 entries, 1 for each operator; 2-3 more for 
the tasks the operators are executed in and the taskmanager stuff.

Usually, operator metrics use the name that you configured, like 
"TokenMapStream", whereas tasks use the concatenation of all operator 
names joined with =>, as in "KafkaSource => TokenMapStream".

I will look into this, I've never seen these issues before.

One more thing, which version of Flink are you currently using?

Regards,
Chesnay


On 15.10.2016 00:07, Philipp Bussche wrote:
> Hi there,
> I am struggeling to understand what I am looking at after enabling JMX
> metric reporting on my taskmanager.
> The job I am trying this out with has 1 source, 2 map functions (where one
> is a RichMap) and 3 sinks.
> This is how I have written my Job:
>
> DataStream<Invitation> invitations = streaming
> 					.addSource(new FlinkKafkaConsumer09<>(
> 							dsp.getPropertyAsString("kafka.invitation.topic"),
> 							new InvitationSchema(),
> 							kafkaProps)).name("KafkaSource");
> invitations.addSink(new
> PostgresqlInvitationDetailsSink<>(psqlConfig)).name("InvitationDetailSink");
>
> DataStream<Tuple2&lt;String, String>> tokens = invitations
> 				.map(new TokenExtractor()).name("TokenMapStream");
> tokens.addSink(new
> PostgresqlInvitationTokenSink<>(psqlConfig)).name("InvitationTokenSink");
>
> DataStream<Tuple4&lt;String, String, String, String>> invitationResponses =
> invitations
> 				.map(new InvitationDetailsExtractor(psqlConfig,
> tokenToSenderMapping)).name("InvitationDetailsRichMapStream");
> invitationResponses.addSink(new
> Neo4JInvitationSink<>(neo4jConfig)).name("InvitationRelationshipSink");
>
> streaming.execute("InvitationJob");
>
> Somehow I was expecting to have metrics representing the source, the sinks
> and the operators, however instead of 6 entries in my JMX tree I only have
> 4. Please see screenshot attached. Also I was somehow expecting the JMX
> objects to be named like my task / operator names but it has all sorts of
> prefix/suffix magic around the names. Finally I have one custom metric which
> obviously is attached to my RichMapFunction (InvitationDetailsExtractor).
> However the custom metric (invitationDetailsAdded) shows up under an object
> where one of the keys (which I would expect to be set to the operation name)
> is a combination of the prefix "Sink" plus the name of the first sink that I
> am using plus the name of the first map function (which is not the
> RichMapFunction actually: my custom metric "invitationDetailsAdded" shows up
> under "(Sink-_InvitationDetailSink-_TokenMapStream" which is very confusing
> because this metric is actually incremented as part of the
> InvitationDetailsRichMapStream).
>
> Can somebody please explain what I can expect from metrics exposed via JMX
> (should they really represent my tasks and operations) and why the naming is
> so strange ?
>
> Thanks
> Philipp
>
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n9560/Screen_Shot_2016-10-15_at_00.png>
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Task-and-Operator-Monitoring-via-JMX-naming-tp9560.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
>