You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Andra Lungu <lu...@gmail.com> on 2015/06/27 16:19:35 UTC

Monitoring a Flink Job

Hey guys,

Me again :) So now that my wonderful job finishes, I would like to monitor
it a bit (i.e. build some charts on the number of messages per vertex,
compute the total amount of time elapsed per computation per vertex, etc).

The main computational-intensive operation is a coGroup. There, within the
iteration I count the number of "messages" sent and then I do simple:

Files.append(messages, messagesTempFile, Charsets.UTF_8);

The problem is that with this approach, I get a deadlock (yes!! Now that I
know the code itself works I am positive that the deadlock comes from the
append -this regarding my previous mail-). It is normal if you come to
think of it 200 something threads are trying to write to the same file...

A possible workaround is this one:

public class Singleton {
    private static final Singleton inst= new Singleton();

    private Singleton() {
        super();
    }

    public synchronized void writeToFile(String str) {
        // Do whatever
    }

    public Singleton getInstance() {
        return inst;
    }
}

Singleton.getInstance().writeToFile("Hello!!");

However, I am not sure how well Flink plays with synchronised....

Is there a smarter way to do it?

Thanks!

Andra

Re: Monitoring a Flink Job

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Andra,

sure, if you do the logging for each record (or group of records) using a
list accumulator is a very bad idea.

If you don't need exact stats for each vertex but rather a distribution
over all vertices, you can use a histogram accumulator.
If you need exact vertex stats, I'd go with Vasia's proposal.

2015-06-29 22:48 GMT+02:00 Vasiliki Kalavri <va...@gmail.com>:

> Andra,
>
> why don't you simply print to standard output and gather your metrics from
> the taskmanagers' log files after execution?
> Wouldn't that work for you?
>
> -V.
>
> On 29 June 2015 at 22:36, Andra Lungu <lu...@gmail.com> wrote:
>
> > Caution! I am getting philosophical. Stop me if I'm talking nonsense!
> >
> > You are suggesting a list that will have one or two entries per vertex =
> > (approx) billions. Won't this over-saturate my memory? I am already
> filling
> > it with lots of junk resulted from the computation...
> >
> > On Mon, Jun 29, 2015 at 1:58 PM, Fabian Hueske <fh...@gmail.com>
> wrote:
> >
> > > Have you tried to use a custom accumulator that just appends to a list?
> > >
> > > 2015-06-29 12:59 GMT+02:00 Andra Lungu <lu...@gmail.com>:
> > >
> > > > Hey Fabian,
> > > >
> > > > I am aware of the way open, preSuperstep(), postSuperstep() etc can
> > help
> > > me
> > > > within an interation, unfortunately I am writing my own method here.
> I
> > > > could try to briefly describe it:
> > > >
> > > > public static final class PropagateNeighborValues implements
> > > > NeighborsFunctionWithVertexValue(...) {
> > > >     @Override
> > > >     public void iterateNeighbors(Iterable..., Collector...) {
> > > >              while(iterator.hasNext) neighbors++;
> > > >              // and I would need something like
> > > >              appendToFile(myAwesomeFile, neighbors);
> > > >     }
> > > > }
> > > >
> > > > Open() and synchronised are definitely not doing the trick for me
> right
> > > > now.
> > > > Any other way !? :(
> > > >
> > > > On Mon, Jun 29, 2015 at 11:36 AM, Fabian Hueske <fh...@gmail.com>
> > > wrote:
> > > >
> > > > > You can measure the time of each iteration in the open() methods
> > > > operators
> > > > > within an iteration. open() will be called before each iteration.
> > > > > The times can be collected by either printing to std out (you need
> to
> > > > > collect the files then...) or by implementing a list accumulator.
> > Each
> > > > time
> > > > > should include the iteration# und parallel task id.
> > > > > After the execution, the acculuator will be available in the
> > execution
> > > > > result.
> > > > >
> > > > > Accumulators can of course also be used to collect number of
> > messages,
> > > > etc.
> > > > >
> > > > > Best, Fabian
> > > > >
> > > > > 2015-06-29 9:55 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it
> >:
> > > > >
> > > > > > Why don't you use Flink dataset output functions (like
> writeAsText,
> > > > > > writeAsCsv, etc..)?
> > > > > > Or if they are not sufficient you can implement/override your own
> > > > > > InputFormat.
> > > > > >
> > > > > > From what is my experience static variables are evil in
> distributed
> > > > > > environments..
> > > > > > Moreover, one of the main strengths of Flink are its input/output
> > > APIs
> > > > > so I
> > > > > > would avoid to write to a file in that way.
> > > > > >
> > > > > > Of course, dataset.append() will be a very convenient API to add
> > > > (IMHO).
> > > > > >
> > > > > > Best,
> > > > > > Flavio
> > > > > >
> > > > > >
> > > > > > On Sat, Jun 27, 2015 at 4:19 PM, Andra Lungu <
> > lungu.andra@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Hey guys,
> > > > > > >
> > > > > > > Me again :) So now that my wonderful job finishes, I would like
> > to
> > > > > > monitor
> > > > > > > it a bit (i.e. build some charts on the number of messages per
> > > > vertex,
> > > > > > > compute the total amount of time elapsed per computation per
> > > vertex,
> > > > > > etc).
> > > > > > >
> > > > > > > The main computational-intensive operation is a coGroup. There,
> > > > within
> > > > > > the
> > > > > > > iteration I count the number of "messages" sent and then I do
> > > simple:
> > > > > > >
> > > > > > > Files.append(messages, messagesTempFile, Charsets.UTF_8);
> > > > > > >
> > > > > > > The problem is that with this approach, I get a deadlock (yes!!
> > Now
> > > > > that
> > > > > > I
> > > > > > > know the code itself works I am positive that the deadlock
> comes
> > > from
> > > > > the
> > > > > > > append -this regarding my previous mail-). It is normal if you
> > come
> > > > to
> > > > > > > think of it 200 something threads are trying to write to the
> same
> > > > > file...
> > > > > > >
> > > > > > > A possible workaround is this one:
> > > > > > >
> > > > > > > public class Singleton {
> > > > > > >     private static final Singleton inst= new Singleton();
> > > > > > >
> > > > > > >     private Singleton() {
> > > > > > >         super();
> > > > > > >     }
> > > > > > >
> > > > > > >     public synchronized void writeToFile(String str) {
> > > > > > >         // Do whatever
> > > > > > >     }
> > > > > > >
> > > > > > >     public Singleton getInstance() {
> > > > > > >         return inst;
> > > > > > >     }
> > > > > > > }
> > > > > > >
> > > > > > > Singleton.getInstance().writeToFile("Hello!!");
> > > > > > >
> > > > > > > However, I am not sure how well Flink plays with
> synchronised....
> > > > > > >
> > > > > > > Is there a smarter way to do it?
> > > > > > >
> > > > > > > Thanks!
> > > > > > >
> > > > > > > Andra
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Monitoring a Flink Job

Posted by Vasiliki Kalavri <va...@gmail.com>.
Andra,

why don't you simply print to standard output and gather your metrics from
the taskmanagers' log files after execution?
Wouldn't that work for you?

-V.

On 29 June 2015 at 22:36, Andra Lungu <lu...@gmail.com> wrote:

> Caution! I am getting philosophical. Stop me if I'm talking nonsense!
>
> You are suggesting a list that will have one or two entries per vertex =
> (approx) billions. Won't this over-saturate my memory? I am already filling
> it with lots of junk resulted from the computation...
>
> On Mon, Jun 29, 2015 at 1:58 PM, Fabian Hueske <fh...@gmail.com> wrote:
>
> > Have you tried to use a custom accumulator that just appends to a list?
> >
> > 2015-06-29 12:59 GMT+02:00 Andra Lungu <lu...@gmail.com>:
> >
> > > Hey Fabian,
> > >
> > > I am aware of the way open, preSuperstep(), postSuperstep() etc can
> help
> > me
> > > within an interation, unfortunately I am writing my own method here. I
> > > could try to briefly describe it:
> > >
> > > public static final class PropagateNeighborValues implements
> > > NeighborsFunctionWithVertexValue(...) {
> > >     @Override
> > >     public void iterateNeighbors(Iterable..., Collector...) {
> > >              while(iterator.hasNext) neighbors++;
> > >              // and I would need something like
> > >              appendToFile(myAwesomeFile, neighbors);
> > >     }
> > > }
> > >
> > > Open() and synchronised are definitely not doing the trick for me right
> > > now.
> > > Any other way !? :(
> > >
> > > On Mon, Jun 29, 2015 at 11:36 AM, Fabian Hueske <fh...@gmail.com>
> > wrote:
> > >
> > > > You can measure the time of each iteration in the open() methods
> > > operators
> > > > within an iteration. open() will be called before each iteration.
> > > > The times can be collected by either printing to std out (you need to
> > > > collect the files then...) or by implementing a list accumulator.
> Each
> > > time
> > > > should include the iteration# und parallel task id.
> > > > After the execution, the acculuator will be available in the
> execution
> > > > result.
> > > >
> > > > Accumulators can of course also be used to collect number of
> messages,
> > > etc.
> > > >
> > > > Best, Fabian
> > > >
> > > > 2015-06-29 9:55 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
> > > >
> > > > > Why don't you use Flink dataset output functions (like writeAsText,
> > > > > writeAsCsv, etc..)?
> > > > > Or if they are not sufficient you can implement/override your own
> > > > > InputFormat.
> > > > >
> > > > > From what is my experience static variables are evil in distributed
> > > > > environments..
> > > > > Moreover, one of the main strengths of Flink are its input/output
> > APIs
> > > > so I
> > > > > would avoid to write to a file in that way.
> > > > >
> > > > > Of course, dataset.append() will be a very convenient API to add
> > > (IMHO).
> > > > >
> > > > > Best,
> > > > > Flavio
> > > > >
> > > > >
> > > > > On Sat, Jun 27, 2015 at 4:19 PM, Andra Lungu <
> lungu.andra@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hey guys,
> > > > > >
> > > > > > Me again :) So now that my wonderful job finishes, I would like
> to
> > > > > monitor
> > > > > > it a bit (i.e. build some charts on the number of messages per
> > > vertex,
> > > > > > compute the total amount of time elapsed per computation per
> > vertex,
> > > > > etc).
> > > > > >
> > > > > > The main computational-intensive operation is a coGroup. There,
> > > within
> > > > > the
> > > > > > iteration I count the number of "messages" sent and then I do
> > simple:
> > > > > >
> > > > > > Files.append(messages, messagesTempFile, Charsets.UTF_8);
> > > > > >
> > > > > > The problem is that with this approach, I get a deadlock (yes!!
> Now
> > > > that
> > > > > I
> > > > > > know the code itself works I am positive that the deadlock comes
> > from
> > > > the
> > > > > > append -this regarding my previous mail-). It is normal if you
> come
> > > to
> > > > > > think of it 200 something threads are trying to write to the same
> > > > file...
> > > > > >
> > > > > > A possible workaround is this one:
> > > > > >
> > > > > > public class Singleton {
> > > > > >     private static final Singleton inst= new Singleton();
> > > > > >
> > > > > >     private Singleton() {
> > > > > >         super();
> > > > > >     }
> > > > > >
> > > > > >     public synchronized void writeToFile(String str) {
> > > > > >         // Do whatever
> > > > > >     }
> > > > > >
> > > > > >     public Singleton getInstance() {
> > > > > >         return inst;
> > > > > >     }
> > > > > > }
> > > > > >
> > > > > > Singleton.getInstance().writeToFile("Hello!!");
> > > > > >
> > > > > > However, I am not sure how well Flink plays with synchronised....
> > > > > >
> > > > > > Is there a smarter way to do it?
> > > > > >
> > > > > > Thanks!
> > > > > >
> > > > > > Andra
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Monitoring a Flink Job

Posted by Andra Lungu <lu...@gmail.com>.
Caution! I am getting philosophical. Stop me if I'm talking nonsense!

You are suggesting a list that will have one or two entries per vertex =
(approx) billions. Won't this over-saturate my memory? I am already filling
it with lots of junk resulted from the computation...

On Mon, Jun 29, 2015 at 1:58 PM, Fabian Hueske <fh...@gmail.com> wrote:

> Have you tried to use a custom accumulator that just appends to a list?
>
> 2015-06-29 12:59 GMT+02:00 Andra Lungu <lu...@gmail.com>:
>
> > Hey Fabian,
> >
> > I am aware of the way open, preSuperstep(), postSuperstep() etc can help
> me
> > within an interation, unfortunately I am writing my own method here. I
> > could try to briefly describe it:
> >
> > public static final class PropagateNeighborValues implements
> > NeighborsFunctionWithVertexValue(...) {
> >     @Override
> >     public void iterateNeighbors(Iterable..., Collector...) {
> >              while(iterator.hasNext) neighbors++;
> >              // and I would need something like
> >              appendToFile(myAwesomeFile, neighbors);
> >     }
> > }
> >
> > Open() and synchronised are definitely not doing the trick for me right
> > now.
> > Any other way !? :(
> >
> > On Mon, Jun 29, 2015 at 11:36 AM, Fabian Hueske <fh...@gmail.com>
> wrote:
> >
> > > You can measure the time of each iteration in the open() methods
> > operators
> > > within an iteration. open() will be called before each iteration.
> > > The times can be collected by either printing to std out (you need to
> > > collect the files then...) or by implementing a list accumulator. Each
> > time
> > > should include the iteration# und parallel task id.
> > > After the execution, the acculuator will be available in the execution
> > > result.
> > >
> > > Accumulators can of course also be used to collect number of messages,
> > etc.
> > >
> > > Best, Fabian
> > >
> > > 2015-06-29 9:55 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
> > >
> > > > Why don't you use Flink dataset output functions (like writeAsText,
> > > > writeAsCsv, etc..)?
> > > > Or if they are not sufficient you can implement/override your own
> > > > InputFormat.
> > > >
> > > > From what is my experience static variables are evil in distributed
> > > > environments..
> > > > Moreover, one of the main strengths of Flink are its input/output
> APIs
> > > so I
> > > > would avoid to write to a file in that way.
> > > >
> > > > Of course, dataset.append() will be a very convenient API to add
> > (IMHO).
> > > >
> > > > Best,
> > > > Flavio
> > > >
> > > >
> > > > On Sat, Jun 27, 2015 at 4:19 PM, Andra Lungu <lu...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hey guys,
> > > > >
> > > > > Me again :) So now that my wonderful job finishes, I would like to
> > > > monitor
> > > > > it a bit (i.e. build some charts on the number of messages per
> > vertex,
> > > > > compute the total amount of time elapsed per computation per
> vertex,
> > > > etc).
> > > > >
> > > > > The main computational-intensive operation is a coGroup. There,
> > within
> > > > the
> > > > > iteration I count the number of "messages" sent and then I do
> simple:
> > > > >
> > > > > Files.append(messages, messagesTempFile, Charsets.UTF_8);
> > > > >
> > > > > The problem is that with this approach, I get a deadlock (yes!! Now
> > > that
> > > > I
> > > > > know the code itself works I am positive that the deadlock comes
> from
> > > the
> > > > > append -this regarding my previous mail-). It is normal if you come
> > to
> > > > > think of it 200 something threads are trying to write to the same
> > > file...
> > > > >
> > > > > A possible workaround is this one:
> > > > >
> > > > > public class Singleton {
> > > > >     private static final Singleton inst= new Singleton();
> > > > >
> > > > >     private Singleton() {
> > > > >         super();
> > > > >     }
> > > > >
> > > > >     public synchronized void writeToFile(String str) {
> > > > >         // Do whatever
> > > > >     }
> > > > >
> > > > >     public Singleton getInstance() {
> > > > >         return inst;
> > > > >     }
> > > > > }
> > > > >
> > > > > Singleton.getInstance().writeToFile("Hello!!");
> > > > >
> > > > > However, I am not sure how well Flink plays with synchronised....
> > > > >
> > > > > Is there a smarter way to do it?
> > > > >
> > > > > Thanks!
> > > > >
> > > > > Andra
> > > > >
> > > >
> > >
> >
>

Re: Monitoring a Flink Job

Posted by Fabian Hueske <fh...@gmail.com>.
Have you tried to use a custom accumulator that just appends to a list?

2015-06-29 12:59 GMT+02:00 Andra Lungu <lu...@gmail.com>:

> Hey Fabian,
>
> I am aware of the way open, preSuperstep(), postSuperstep() etc can help me
> within an interation, unfortunately I am writing my own method here. I
> could try to briefly describe it:
>
> public static final class PropagateNeighborValues implements
> NeighborsFunctionWithVertexValue(...) {
>     @Override
>     public void iterateNeighbors(Iterable..., Collector...) {
>              while(iterator.hasNext) neighbors++;
>              // and I would need something like
>              appendToFile(myAwesomeFile, neighbors);
>     }
> }
>
> Open() and synchronised are definitely not doing the trick for me right
> now.
> Any other way !? :(
>
> On Mon, Jun 29, 2015 at 11:36 AM, Fabian Hueske <fh...@gmail.com> wrote:
>
> > You can measure the time of each iteration in the open() methods
> operators
> > within an iteration. open() will be called before each iteration.
> > The times can be collected by either printing to std out (you need to
> > collect the files then...) or by implementing a list accumulator. Each
> time
> > should include the iteration# und parallel task id.
> > After the execution, the acculuator will be available in the execution
> > result.
> >
> > Accumulators can of course also be used to collect number of messages,
> etc.
> >
> > Best, Fabian
> >
> > 2015-06-29 9:55 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
> >
> > > Why don't you use Flink dataset output functions (like writeAsText,
> > > writeAsCsv, etc..)?
> > > Or if they are not sufficient you can implement/override your own
> > > InputFormat.
> > >
> > > From what is my experience static variables are evil in distributed
> > > environments..
> > > Moreover, one of the main strengths of Flink are its input/output APIs
> > so I
> > > would avoid to write to a file in that way.
> > >
> > > Of course, dataset.append() will be a very convenient API to add
> (IMHO).
> > >
> > > Best,
> > > Flavio
> > >
> > >
> > > On Sat, Jun 27, 2015 at 4:19 PM, Andra Lungu <lu...@gmail.com>
> > > wrote:
> > >
> > > > Hey guys,
> > > >
> > > > Me again :) So now that my wonderful job finishes, I would like to
> > > monitor
> > > > it a bit (i.e. build some charts on the number of messages per
> vertex,
> > > > compute the total amount of time elapsed per computation per vertex,
> > > etc).
> > > >
> > > > The main computational-intensive operation is a coGroup. There,
> within
> > > the
> > > > iteration I count the number of "messages" sent and then I do simple:
> > > >
> > > > Files.append(messages, messagesTempFile, Charsets.UTF_8);
> > > >
> > > > The problem is that with this approach, I get a deadlock (yes!! Now
> > that
> > > I
> > > > know the code itself works I am positive that the deadlock comes from
> > the
> > > > append -this regarding my previous mail-). It is normal if you come
> to
> > > > think of it 200 something threads are trying to write to the same
> > file...
> > > >
> > > > A possible workaround is this one:
> > > >
> > > > public class Singleton {
> > > >     private static final Singleton inst= new Singleton();
> > > >
> > > >     private Singleton() {
> > > >         super();
> > > >     }
> > > >
> > > >     public synchronized void writeToFile(String str) {
> > > >         // Do whatever
> > > >     }
> > > >
> > > >     public Singleton getInstance() {
> > > >         return inst;
> > > >     }
> > > > }
> > > >
> > > > Singleton.getInstance().writeToFile("Hello!!");
> > > >
> > > > However, I am not sure how well Flink plays with synchronised....
> > > >
> > > > Is there a smarter way to do it?
> > > >
> > > > Thanks!
> > > >
> > > > Andra
> > > >
> > >
> >
>

Re: Monitoring a Flink Job

Posted by Andra Lungu <lu...@gmail.com>.
Hey Fabian,

I am aware of the way open, preSuperstep(), postSuperstep() etc can help me
within an interation, unfortunately I am writing my own method here. I
could try to briefly describe it:

public static final class PropagateNeighborValues implements
NeighborsFunctionWithVertexValue(...) {
    @Override
    public void iterateNeighbors(Iterable..., Collector...) {
             while(iterator.hasNext) neighbors++;
             // and I would need something like
             appendToFile(myAwesomeFile, neighbors);
    }
}

Open() and synchronised are definitely not doing the trick for me right
now.
Any other way !? :(

On Mon, Jun 29, 2015 at 11:36 AM, Fabian Hueske <fh...@gmail.com> wrote:

> You can measure the time of each iteration in the open() methods operators
> within an iteration. open() will be called before each iteration.
> The times can be collected by either printing to std out (you need to
> collect the files then...) or by implementing a list accumulator. Each time
> should include the iteration# und parallel task id.
> After the execution, the acculuator will be available in the execution
> result.
>
> Accumulators can of course also be used to collect number of messages, etc.
>
> Best, Fabian
>
> 2015-06-29 9:55 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>
> > Why don't you use Flink dataset output functions (like writeAsText,
> > writeAsCsv, etc..)?
> > Or if they are not sufficient you can implement/override your own
> > InputFormat.
> >
> > From what is my experience static variables are evil in distributed
> > environments..
> > Moreover, one of the main strengths of Flink are its input/output APIs
> so I
> > would avoid to write to a file in that way.
> >
> > Of course, dataset.append() will be a very convenient API to add (IMHO).
> >
> > Best,
> > Flavio
> >
> >
> > On Sat, Jun 27, 2015 at 4:19 PM, Andra Lungu <lu...@gmail.com>
> > wrote:
> >
> > > Hey guys,
> > >
> > > Me again :) So now that my wonderful job finishes, I would like to
> > monitor
> > > it a bit (i.e. build some charts on the number of messages per vertex,
> > > compute the total amount of time elapsed per computation per vertex,
> > etc).
> > >
> > > The main computational-intensive operation is a coGroup. There, within
> > the
> > > iteration I count the number of "messages" sent and then I do simple:
> > >
> > > Files.append(messages, messagesTempFile, Charsets.UTF_8);
> > >
> > > The problem is that with this approach, I get a deadlock (yes!! Now
> that
> > I
> > > know the code itself works I am positive that the deadlock comes from
> the
> > > append -this regarding my previous mail-). It is normal if you come to
> > > think of it 200 something threads are trying to write to the same
> file...
> > >
> > > A possible workaround is this one:
> > >
> > > public class Singleton {
> > >     private static final Singleton inst= new Singleton();
> > >
> > >     private Singleton() {
> > >         super();
> > >     }
> > >
> > >     public synchronized void writeToFile(String str) {
> > >         // Do whatever
> > >     }
> > >
> > >     public Singleton getInstance() {
> > >         return inst;
> > >     }
> > > }
> > >
> > > Singleton.getInstance().writeToFile("Hello!!");
> > >
> > > However, I am not sure how well Flink plays with synchronised....
> > >
> > > Is there a smarter way to do it?
> > >
> > > Thanks!
> > >
> > > Andra
> > >
> >
>

Re: Monitoring a Flink Job

Posted by Fabian Hueske <fh...@gmail.com>.
You can measure the time of each iteration in the open() methods operators
within an iteration. open() will be called before each iteration.
The times can be collected by either printing to std out (you need to
collect the files then...) or by implementing a list accumulator. Each time
should include the iteration# und parallel task id.
After the execution, the acculuator will be available in the execution
result.

Accumulators can of course also be used to collect number of messages, etc.

Best, Fabian

2015-06-29 9:55 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:

> Why don't you use Flink dataset output functions (like writeAsText,
> writeAsCsv, etc..)?
> Or if they are not sufficient you can implement/override your own
> InputFormat.
>
> From what is my experience static variables are evil in distributed
> environments..
> Moreover, one of the main strengths of Flink are its input/output APIs so I
> would avoid to write to a file in that way.
>
> Of course, dataset.append() will be a very convenient API to add (IMHO).
>
> Best,
> Flavio
>
>
> On Sat, Jun 27, 2015 at 4:19 PM, Andra Lungu <lu...@gmail.com>
> wrote:
>
> > Hey guys,
> >
> > Me again :) So now that my wonderful job finishes, I would like to
> monitor
> > it a bit (i.e. build some charts on the number of messages per vertex,
> > compute the total amount of time elapsed per computation per vertex,
> etc).
> >
> > The main computational-intensive operation is a coGroup. There, within
> the
> > iteration I count the number of "messages" sent and then I do simple:
> >
> > Files.append(messages, messagesTempFile, Charsets.UTF_8);
> >
> > The problem is that with this approach, I get a deadlock (yes!! Now that
> I
> > know the code itself works I am positive that the deadlock comes from the
> > append -this regarding my previous mail-). It is normal if you come to
> > think of it 200 something threads are trying to write to the same file...
> >
> > A possible workaround is this one:
> >
> > public class Singleton {
> >     private static final Singleton inst= new Singleton();
> >
> >     private Singleton() {
> >         super();
> >     }
> >
> >     public synchronized void writeToFile(String str) {
> >         // Do whatever
> >     }
> >
> >     public Singleton getInstance() {
> >         return inst;
> >     }
> > }
> >
> > Singleton.getInstance().writeToFile("Hello!!");
> >
> > However, I am not sure how well Flink plays with synchronised....
> >
> > Is there a smarter way to do it?
> >
> > Thanks!
> >
> > Andra
> >
>

Re: Monitoring a Flink Job

Posted by Flavio Pompermaier <po...@okkam.it>.
Why don't you use Flink dataset output functions (like writeAsText,
writeAsCsv, etc..)?
Or if they are not sufficient you can implement/override your own
InputFormat.

>From what is my experience static variables are evil in distributed
environments..
Moreover, one of the main strengths of Flink are its input/output APIs so I
would avoid to write to a file in that way.

Of course, dataset.append() will be a very convenient API to add (IMHO).

Best,
Flavio


On Sat, Jun 27, 2015 at 4:19 PM, Andra Lungu <lu...@gmail.com> wrote:

> Hey guys,
>
> Me again :) So now that my wonderful job finishes, I would like to monitor
> it a bit (i.e. build some charts on the number of messages per vertex,
> compute the total amount of time elapsed per computation per vertex, etc).
>
> The main computational-intensive operation is a coGroup. There, within the
> iteration I count the number of "messages" sent and then I do simple:
>
> Files.append(messages, messagesTempFile, Charsets.UTF_8);
>
> The problem is that with this approach, I get a deadlock (yes!! Now that I
> know the code itself works I am positive that the deadlock comes from the
> append -this regarding my previous mail-). It is normal if you come to
> think of it 200 something threads are trying to write to the same file...
>
> A possible workaround is this one:
>
> public class Singleton {
>     private static final Singleton inst= new Singleton();
>
>     private Singleton() {
>         super();
>     }
>
>     public synchronized void writeToFile(String str) {
>         // Do whatever
>     }
>
>     public Singleton getInstance() {
>         return inst;
>     }
> }
>
> Singleton.getInstance().writeToFile("Hello!!");
>
> However, I am not sure how well Flink plays with synchronised....
>
> Is there a smarter way to do it?
>
> Thanks!
>
> Andra
>