You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Andra Lungu <lu...@gmail.com> on 2015/03/26 16:46:21 UTC

Memory segment error

Hello everyone,

I guess I need to revive this old discussion:
http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Memory-segment-error-when-migrating-functional-code-from-Flink-0-9-to-0-8-td3687.html

At that point, the fix was to kindly ask Alex to make his project work with
0.9.

Now, I am not that lucky!

This is the code:
https://github.com/andralungu/gelly-partitioning/tree/alphaSplit

The main program(NodeSplitting) is working nicely, I get the correct
result. But if you run the test,  you will see that collection works and
cluster fails miserably with this exception:

Caused by: java.lang.Exception: The data preparation for task 'Join(Join at
weighEdges(NodeSplitting.java:112)) (04e172e761148a65783a4363406e08c0)' ,
caused an error: Too few memory segments provided. Hash Join needs at least
33 memory segments.
    at
org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471)
    at
org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
    at
org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:209)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: Too few memory segments
provided. Hash Join needs at least 33 memory segments.

I am running locally, from IntelliJ, on a tiny graph.
$ cat /proc/meminfo
MemTotal:       11405696 kB
MemFree:         5586012 kB
Buffers:          178100 kB

I am sure I did not run out of memory...

Any thoughts on this?

Thanks!
Andra

Re: Memory segment error

Posted by Fabian Hueske <fh...@gmail.com>.
Robert or Stephan know the Travis setup quite well.
They might know, if we can give a bit more than 80MB. But at some point
there will be a hard limit.
Once we have dynamic memory management (most of) such problems should be
solved.

2015-03-30 23:46 GMT+02:00 Andra Lungu <lu...@gmail.com>:

> Oh! In that case, who should I refer to? :D
> [It's kind of ugly to split this kind of test. I mean if a person is
> counting the degrees, then that's the result that should be tested - at
> least in my opinion]
>
> In any case, thanks for the help :)
>
> On Mon, Mar 30, 2015 at 11:37 PM, Fabian Hueske <fh...@gmail.com> wrote:
>
> > Well, each combiner, reducer, join, coGroup, and solutionset needs a
> share
> > of memory (maps & filters don't).
> > In your case it was pretty much at the edge, the hash joins require 33
> > buffers and got 32. So one memory-consuming operator less might fix it.
> > I did not look in detail at the other job, but it did not seem so much
> more
> > complex than the other. As said before, LOCs or total number of operators
> > are not the important thing here. It's the number of memory consumers.
> >
> > I am not sure how hard the 80MB limit is. Maybe it is possible to
> increase
> > that a bit.
> >
> > 2015-03-30 23:25 GMT+02:00 Andra Lungu <lu...@gmail.com>:
> >
> > > Hi Fabian,
> > >
> > > I'll see what I can do :).
> > > I am just a bit shocked. If this set of coGroups and joins was too much
> > for
> > > a test case, how come the following worked?
> > >
> > >
> > >
> >
> https://github.com/andralungu/flink/commit/f60b022de056ac259459b68eee6ff0ae9993f0f8
> > >
> > > 400 lines of complex computations :) And I have an even bigger one for
> > > which the test also passed...
> > >
> > >
> > > On Mon, Mar 30, 2015 at 2:31 PM, Fabian Hueske <fh...@gmail.com>
> > wrote:
> > >
> > > > Hi Andra,
> > > >
> > > > I found the cause for the exception. Your test case is simply too
> > complex
> > > > for our testing environment.
> > > > We restrict the TM memory for testcases to 80MB in order to execute
> > > > multiple tests in parallel on Travis.
> > > > I counted the memory consumers in your job and got:
> > > >
> > > > - 2 Combine
> > > > - 4 GroupReduce
> > > > - 4 CoGroup
> > > > - 2 Joins
> > > > - 1 SolutionSet
> > > >
> > > > Those are quite a few memory consumers for 20MB per slot (4 slots per
> > > TM).
> > > >
> > > > Do you see a way to reduce the number of operators in your testcase,
> > > maybe
> > > > by splitting it in half?
> > > >
> > > > 2015-03-30 11:01 GMT+02:00 Andra Lungu <lu...@gmail.com>:
> > > >
> > > > > Sure,
> > > > >
> > > > > It was in the first mail but that was sent a while ago :)
> > > > >
> > > > > This is the code:
> > > > > https://github.com/andralungu/gelly-partitioning/tree/alphaSplit
> > > > > I also added the log4j file in case it helps!
> > > > >
> > > > > The error is totally reproducible. 2 out of 2 people got the same.
> > > > > Steps to reproduce:
> > > > > 1). Clone the code; switch to alphaSplit branch
> > > > > 2). Run CounDegreeITCase.java
> > > > >
> > > > > Hope we can get to the bottom of this! If you need something, just
> > ask.
> > > > >
> > > > >
> > > > > On Mon, Mar 30, 2015 at 10:54 AM, Fabian Hueske <fhueske@gmail.com
> >
> > > > wrote:
> > > > >
> > > > > > Hmm, that is really weird.
> > > > > > Can you point me to a branch in your repository and the test case
> > > that
> > > > > > gives the error?
> > > > > >
> > > > > > Then I have a look at it and try to figure out what's going
> wrong.
> > > > > >
> > > > > > Cheers, Fabian
> > > > > >
> > > > > > 2015-03-30 10:43 GMT+02:00 Andra Lungu <lu...@gmail.com>:
> > > > > >
> > > > > > > Hello,
> > > > > > >
> > > > > > > I went on and did some further debugging on this issue. Even
> > though
> > > > the
> > > > > > > exception said that the problem comes from here:
> > > > > > > 4837 [Join(Join at* weighEdges(NodeSplitting.java:117)*) (1/4)]
> > > ERROR
> > > > > > > org.apache.flink.runtime.operators.RegularPactTask  - Error in
> > task
> > > > > code:
> > > > > > > Join(Join at weighEdges(NodeSplitting.java:117)) (1/4)
> > > > > > > java.lang.Exception: The data preparation for task 'Join(Join
> at
> > > > > > > weighEdges(NodeSplitting.java:117))' , caused an error: Too few
> > > > memory
> > > > > > > segments provided. Hash Join needs at least 33 memory segments.
> > > > > > >     at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471)
> > > > > > >     at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> > > > > > >     at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)
> > > > > > >     at java.lang.Thread.run(Thread.java:745)
> > > > > > >
> > > > > > > which is basically a chain of two joins, schema that I have
> > > repeated
> > > > > > > several times, including in the getTriplets() method and it
> > passed
> > > > > every
> > > > > > > time. I thought that this could not be right!
> > > > > > >
> > > > > > > So I picked each intermediate data set formed, printed it and
> > > added a
> > > > > > > System.exit(0) afterwards. The exception comes from this
> method:
> > > > > > > aggregatePartialValuesSplitVertices. Even though this computes
> > the
> > > > > > correct
> > > > > > > result, it then throws the memory segment exception(!!!!!! Just
> > for
> > > > the
> > > > > > > Cluster test - everything else works).
> > > > > > >
> > > > > > > The code in the function is:
> > > > > > >
> > > > > > > private static DataSet<Vertex<String, Long>>
> > > > > > > aggregatePartialValuesSplitVertices(DataSet<Vertex<String,
> Long>>
> > > > > > > resultedVertices) {
> > > > > > >
> > > > > > >    return resultedVertices.flatMap(new
> > > FlatMapFunction<Vertex<String,
> > > > > > > Long>, Vertex<String, Long>>() {
> > > > > > >
> > > > > > >       @Override
> > > > > > >       public void flatMap(Vertex<String, Long> vertex,
> > > > > > > Collector<Vertex<String, Long>> collector) throws Exception {
> > > > > > >          int pos = vertex.getId().indexOf("_");
> > > > > > >
> > > > > > >          // if there is a splitted vertex
> > > > > > >          if(pos > -1) {
> > > > > > >             collector.collect(new Vertex<String,
> > > > > > > Long>(vertex.getId().substring(0, pos), vertex.getValue()));
> > > > > > >          } else {
> > > > > > >             collector.collect(vertex);
> > > > > > >          }
> > > > > > >       }
> > > > > > >    }).groupBy(0).reduceGroup(new
> > GroupReduceFunction<Vertex<String,
> > > > > > > Long>, Vertex<String, Long>>() {
> > > > > > >
> > > > > > >       @Override
> > > > > > >       public void reduce(Iterable<Vertex<String, Long>>
> iterable,
> > > > > > >                      Collector<Vertex<String, Long>> collector)
> > > > throws
> > > > > > > Exception {
> > > > > > >          long sum = 0;
> > > > > > >          Vertex<String, Long> vertex = new Vertex<String,
> > Long>();
> > > > > > >
> > > > > > >          Iterator<Vertex<String, Long>> iterator =
> > > > iterable.iterator();
> > > > > > >          while (iterator.hasNext()) {
> > > > > > >             vertex = iterator.next();
> > > > > > >             sum += vertex.getValue();
> > > > > > >          }
> > > > > > >
> > > > > > >          collector.collect(new Vertex<String,
> > Long>(vertex.getId(),
> > > > > > sum));
> > > > > > >       }
> > > > > > >    });
> > > > > > >
> > > > > > > To me, nothing seems out of the ordinary here. This is regular
> > user
> > > > > code.
> > > > > > > And the behaviour in the end is definitely not the one
> expected.
> > > Any
> > > > > idea
> > > > > > > why this might be happening?
> > > > > > >
> > > > > > > Thanks!
> > > > > > > Andra
> > > > > > >
> > > > > > > On Fri, Mar 27, 2015 at 12:08 AM, Andra Lungu <
> > > lungu.andra@gmail.com
> > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Opps! Sorry! Did not know the mailing list does not support
> > > > > attachments
> > > > > > > :)
> > > > > > > > https://gist.github.com/andralungu/fba36d77f79189daa183
> > > > > > > >
> > > > > > > > On Fri, Mar 27, 2015 at 12:02 AM, Andra Lungu <
> > > > lungu.andra@gmail.com
> > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > >> Hi Fabian,
> > > > > > > >>
> > > > > > > >> I uploaded a file with my execution plan.
> > > > > > > >>
> > > > > > > >> On Thu, Mar 26, 2015 at 11:50 PM, Fabian Hueske <
> > > > fhueske@gmail.com>
> > > > > > > >> wrote:
> > > > > > > >>
> > > > > > > >>> Hi Andra,
> > > > > > > >>>
> > > > > > > >>> the error is independent of the size of the data set. A
> > > HashTable
> > > > > > needs
> > > > > > > >>> at
> > > > > > > >>> least 33 memory pages to operate.
> > > > > > > >>> Since you have 820MB of managed memory and the size of a
> > memory
> > > > > page
> > > > > > is
> > > > > > > >>> 32KB, there should be more than 25k pages available.
> > > > > > > >>>
> > > > > > > >>> Can you post the execution plan of the program you execute
> (
> > > > > > > >>> ExecutionEnvironment.getExecutionPlan() )?
> > > > > > > >>>
> > > > > > > >>> Best, Fabian
> > > > > > > >>>
> > > > > > > >>> 2015-03-26 23:31 GMT+01:00 Andra Lungu <
> > lungu.andra@gmail.com
> > > >:
> > > > > > > >>>
> > > > > > > >>> > For 20 edges and 5 nodes, that should be more thank
> enough.
> > > > > > > >>> >
> > > > > > > >>> > On Thu, Mar 26, 2015 at 11:24 PM, Andra Lungu <
> > > > > > lungu.andra@gmail.com
> > > > > > > >
> > > > > > > >>> > wrote:
> > > > > > > >>> >
> > > > > > > >>> > > Sure,
> > > > > > > >>> > >
> > > > > > > >>> > > 3470 [main] INFO
> > > > > > > org.apache.flink.runtime.taskmanager.TaskManager  -
> > > > > > > >>> > > Using 820 MB for Flink managed memory.
> > > > > > > >>> > >
> > > > > > > >>> > > On Thu, Mar 26, 2015 at 4:48 PM, Robert Metzger <
> > > > > > > rmetzger@apache.org
> > > > > > > >>> >
> > > > > > > >>> > > wrote:
> > > > > > > >>> > >
> > > > > > > >>> > >> Hi,
> > > > > > > >>> > >>
> > > > > > > >>> > >> during startup, Flink will log something like:
> > > > > > > >>> > >> 16:48:09,669 INFO
> > > > > > > org.apache.flink.runtime.taskmanager.TaskManager
> > > > > > > >>> > >>      - Using 1193 MB for Flink managed memory.
> > > > > > > >>> > >>
> > > > > > > >>> > >> Can you tell us how much memory Flink is managing in
> > your
> > > > > case?
> > > > > > > >>> > >>
> > > > > > > >>> > >>
> > > > > > > >>> > >>
> > > > > > > >>> > >> On Thu, Mar 26, 2015 at 4:46 PM, Andra Lungu <
> > > > > > > lungu.andra@gmail.com
> > > > > > > >>> >
> > > > > > > >>> > >> wrote:
> > > > > > > >>> > >>
> > > > > > > >>> > >> > Hello everyone,
> > > > > > > >>> > >> >
> > > > > > > >>> > >> > I guess I need to revive this old discussion:
> > > > > > > >>> > >> >
> > > > > > > >>> > >> >
> > > > > > > >>> > >>
> > > > > > > >>> >
> > > > > > > >>>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Memory-segment-error-when-migrating-functional-code-from-Flink-0-9-to-0-8-td3687.html
> > > > > > > >>> > >> >
> > > > > > > >>> > >> > At that point, the fix was to kindly ask Alex to
> make
> > > his
> > > > > > > project
> > > > > > > >>> work
> > > > > > > >>> > >> with
> > > > > > > >>> > >> > 0.9.
> > > > > > > >>> > >> >
> > > > > > > >>> > >> > Now, I am not that lucky!
> > > > > > > >>> > >> >
> > > > > > > >>> > >> > This is the code:
> > > > > > > >>> > >> >
> > > > > > >
> https://github.com/andralungu/gelly-partitioning/tree/alphaSplit
> > > > > > > >>> > >> >
> > > > > > > >>> > >> > The main program(NodeSplitting) is working nicely, I
> > get
> > > > the
> > > > > > > >>> correct
> > > > > > > >>> > >> > result. But if you run the test,  you will see that
> > > > > collection
> > > > > > > >>> works
> > > > > > > >>> > and
> > > > > > > >>> > >> > cluster fails miserably with this exception:
> > > > > > > >>> > >> >
> > > > > > > >>> > >> > Caused by: java.lang.Exception: The data preparation
> > for
> > > > > task
> > > > > > > >>> > >> 'Join(Join at
> > > > > > > >>> > >> > weighEdges(NodeSplitting.java:112))
> > > > > > > >>> > (04e172e761148a65783a4363406e08c0)'
> > > > > > > >>> > >> ,
> > > > > > > >>> > >> > caused an error: Too few memory segments provided.
> > Hash
> > > > Join
> > > > > > > >>> needs at
> > > > > > > >>> > >> least
> > > > > > > >>> > >> > 33 memory segments.
> > > > > > > >>> > >> >     at
> > > > > > > >>> > >> >
> > > > > > > >>> > >> >
> > > > > > > >>> > >>
> > > > > > > >>> >
> > > > > > > >>>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471)
> > > > > > > >>> > >> >     at
> > > > > > > >>> > >> >
> > > > > > > >>> > >> >
> > > > > > > >>> > >>
> > > > > > > >>> >
> > > > > > > >>>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> > > > > > > >>> > >> >     at
> > > > > > > >>> > >> >
> > > > > > > >>> > >> >
> > > > > > > >>> > >>
> > > > > > > >>> >
> > > > > > > >>>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:209)
> > > > > > > >>> > >> >     at java.lang.Thread.run(Thread.java:745)
> > > > > > > >>> > >> > Caused by: java.lang.IllegalArgumentException: Too
> few
> > > > > memory
> > > > > > > >>> segments
> > > > > > > >>> > >> > provided. Hash Join needs at least 33 memory
> segments.
> > > > > > > >>> > >> >
> > > > > > > >>> > >> > I am running locally, from IntelliJ, on a tiny
> graph.
> > > > > > > >>> > >> > $ cat /proc/meminfo
> > > > > > > >>> > >> > MemTotal:       11405696 kB
> > > > > > > >>> > >> > MemFree:         5586012 kB
> > > > > > > >>> > >> > Buffers:          178100 kB
> > > > > > > >>> > >> >
> > > > > > > >>> > >> > I am sure I did not run out of memory...
> > > > > > > >>> > >> >
> > > > > > > >>> > >> > Any thoughts on this?
> > > > > > > >>> > >> >
> > > > > > > >>> > >> > Thanks!
> > > > > > > >>> > >> > Andra
> > > > > > > >>> > >> >
> > > > > > > >>> > >>
> > > > > > > >>> > >
> > > > > > > >>> > >
> > > > > > > >>> >
> > > > > > > >>>
> > > > > > > >>
> > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Memory segment error

Posted by Andra Lungu <lu...@gmail.com>.
Oh! In that case, who should I refer to? :D
[It's kind of ugly to split this kind of test. I mean if a person is
counting the degrees, then that's the result that should be tested - at
least in my opinion]

In any case, thanks for the help :)

On Mon, Mar 30, 2015 at 11:37 PM, Fabian Hueske <fh...@gmail.com> wrote:

> Well, each combiner, reducer, join, coGroup, and solutionset needs a share
> of memory (maps & filters don't).
> In your case it was pretty much at the edge, the hash joins require 33
> buffers and got 32. So one memory-consuming operator less might fix it.
> I did not look in detail at the other job, but it did not seem so much more
> complex than the other. As said before, LOCs or total number of operators
> are not the important thing here. It's the number of memory consumers.
>
> I am not sure how hard the 80MB limit is. Maybe it is possible to increase
> that a bit.
>
> 2015-03-30 23:25 GMT+02:00 Andra Lungu <lu...@gmail.com>:
>
> > Hi Fabian,
> >
> > I'll see what I can do :).
> > I am just a bit shocked. If this set of coGroups and joins was too much
> for
> > a test case, how come the following worked?
> >
> >
> >
> https://github.com/andralungu/flink/commit/f60b022de056ac259459b68eee6ff0ae9993f0f8
> >
> > 400 lines of complex computations :) And I have an even bigger one for
> > which the test also passed...
> >
> >
> > On Mon, Mar 30, 2015 at 2:31 PM, Fabian Hueske <fh...@gmail.com>
> wrote:
> >
> > > Hi Andra,
> > >
> > > I found the cause for the exception. Your test case is simply too
> complex
> > > for our testing environment.
> > > We restrict the TM memory for testcases to 80MB in order to execute
> > > multiple tests in parallel on Travis.
> > > I counted the memory consumers in your job and got:
> > >
> > > - 2 Combine
> > > - 4 GroupReduce
> > > - 4 CoGroup
> > > - 2 Joins
> > > - 1 SolutionSet
> > >
> > > Those are quite a few memory consumers for 20MB per slot (4 slots per
> > TM).
> > >
> > > Do you see a way to reduce the number of operators in your testcase,
> > maybe
> > > by splitting it in half?
> > >
> > > 2015-03-30 11:01 GMT+02:00 Andra Lungu <lu...@gmail.com>:
> > >
> > > > Sure,
> > > >
> > > > It was in the first mail but that was sent a while ago :)
> > > >
> > > > This is the code:
> > > > https://github.com/andralungu/gelly-partitioning/tree/alphaSplit
> > > > I also added the log4j file in case it helps!
> > > >
> > > > The error is totally reproducible. 2 out of 2 people got the same.
> > > > Steps to reproduce:
> > > > 1). Clone the code; switch to alphaSplit branch
> > > > 2). Run CounDegreeITCase.java
> > > >
> > > > Hope we can get to the bottom of this! If you need something, just
> ask.
> > > >
> > > >
> > > > On Mon, Mar 30, 2015 at 10:54 AM, Fabian Hueske <fh...@gmail.com>
> > > wrote:
> > > >
> > > > > Hmm, that is really weird.
> > > > > Can you point me to a branch in your repository and the test case
> > that
> > > > > gives the error?
> > > > >
> > > > > Then I have a look at it and try to figure out what's going wrong.
> > > > >
> > > > > Cheers, Fabian
> > > > >
> > > > > 2015-03-30 10:43 GMT+02:00 Andra Lungu <lu...@gmail.com>:
> > > > >
> > > > > > Hello,
> > > > > >
> > > > > > I went on and did some further debugging on this issue. Even
> though
> > > the
> > > > > > exception said that the problem comes from here:
> > > > > > 4837 [Join(Join at* weighEdges(NodeSplitting.java:117)*) (1/4)]
> > ERROR
> > > > > > org.apache.flink.runtime.operators.RegularPactTask  - Error in
> task
> > > > code:
> > > > > > Join(Join at weighEdges(NodeSplitting.java:117)) (1/4)
> > > > > > java.lang.Exception: The data preparation for task 'Join(Join at
> > > > > > weighEdges(NodeSplitting.java:117))' , caused an error: Too few
> > > memory
> > > > > > segments provided. Hash Join needs at least 33 memory segments.
> > > > > >     at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471)
> > > > > >     at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> > > > > >     at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)
> > > > > >     at java.lang.Thread.run(Thread.java:745)
> > > > > >
> > > > > > which is basically a chain of two joins, schema that I have
> > repeated
> > > > > > several times, including in the getTriplets() method and it
> passed
> > > > every
> > > > > > time. I thought that this could not be right!
> > > > > >
> > > > > > So I picked each intermediate data set formed, printed it and
> > added a
> > > > > > System.exit(0) afterwards. The exception comes from this method:
> > > > > > aggregatePartialValuesSplitVertices. Even though this computes
> the
> > > > > correct
> > > > > > result, it then throws the memory segment exception(!!!!!! Just
> for
> > > the
> > > > > > Cluster test - everything else works).
> > > > > >
> > > > > > The code in the function is:
> > > > > >
> > > > > > private static DataSet<Vertex<String, Long>>
> > > > > > aggregatePartialValuesSplitVertices(DataSet<Vertex<String, Long>>
> > > > > > resultedVertices) {
> > > > > >
> > > > > >    return resultedVertices.flatMap(new
> > FlatMapFunction<Vertex<String,
> > > > > > Long>, Vertex<String, Long>>() {
> > > > > >
> > > > > >       @Override
> > > > > >       public void flatMap(Vertex<String, Long> vertex,
> > > > > > Collector<Vertex<String, Long>> collector) throws Exception {
> > > > > >          int pos = vertex.getId().indexOf("_");
> > > > > >
> > > > > >          // if there is a splitted vertex
> > > > > >          if(pos > -1) {
> > > > > >             collector.collect(new Vertex<String,
> > > > > > Long>(vertex.getId().substring(0, pos), vertex.getValue()));
> > > > > >          } else {
> > > > > >             collector.collect(vertex);
> > > > > >          }
> > > > > >       }
> > > > > >    }).groupBy(0).reduceGroup(new
> GroupReduceFunction<Vertex<String,
> > > > > > Long>, Vertex<String, Long>>() {
> > > > > >
> > > > > >       @Override
> > > > > >       public void reduce(Iterable<Vertex<String, Long>> iterable,
> > > > > >                      Collector<Vertex<String, Long>> collector)
> > > throws
> > > > > > Exception {
> > > > > >          long sum = 0;
> > > > > >          Vertex<String, Long> vertex = new Vertex<String,
> Long>();
> > > > > >
> > > > > >          Iterator<Vertex<String, Long>> iterator =
> > > iterable.iterator();
> > > > > >          while (iterator.hasNext()) {
> > > > > >             vertex = iterator.next();
> > > > > >             sum += vertex.getValue();
> > > > > >          }
> > > > > >
> > > > > >          collector.collect(new Vertex<String,
> Long>(vertex.getId(),
> > > > > sum));
> > > > > >       }
> > > > > >    });
> > > > > >
> > > > > > To me, nothing seems out of the ordinary here. This is regular
> user
> > > > code.
> > > > > > And the behaviour in the end is definitely not the one expected.
> > Any
> > > > idea
> > > > > > why this might be happening?
> > > > > >
> > > > > > Thanks!
> > > > > > Andra
> > > > > >
> > > > > > On Fri, Mar 27, 2015 at 12:08 AM, Andra Lungu <
> > lungu.andra@gmail.com
> > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Opps! Sorry! Did not know the mailing list does not support
> > > > attachments
> > > > > > :)
> > > > > > > https://gist.github.com/andralungu/fba36d77f79189daa183
> > > > > > >
> > > > > > > On Fri, Mar 27, 2015 at 12:02 AM, Andra Lungu <
> > > lungu.andra@gmail.com
> > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > >> Hi Fabian,
> > > > > > >>
> > > > > > >> I uploaded a file with my execution plan.
> > > > > > >>
> > > > > > >> On Thu, Mar 26, 2015 at 11:50 PM, Fabian Hueske <
> > > fhueske@gmail.com>
> > > > > > >> wrote:
> > > > > > >>
> > > > > > >>> Hi Andra,
> > > > > > >>>
> > > > > > >>> the error is independent of the size of the data set. A
> > HashTable
> > > > > needs
> > > > > > >>> at
> > > > > > >>> least 33 memory pages to operate.
> > > > > > >>> Since you have 820MB of managed memory and the size of a
> memory
> > > > page
> > > > > is
> > > > > > >>> 32KB, there should be more than 25k pages available.
> > > > > > >>>
> > > > > > >>> Can you post the execution plan of the program you execute (
> > > > > > >>> ExecutionEnvironment.getExecutionPlan() )?
> > > > > > >>>
> > > > > > >>> Best, Fabian
> > > > > > >>>
> > > > > > >>> 2015-03-26 23:31 GMT+01:00 Andra Lungu <
> lungu.andra@gmail.com
> > >:
> > > > > > >>>
> > > > > > >>> > For 20 edges and 5 nodes, that should be more thank enough.
> > > > > > >>> >
> > > > > > >>> > On Thu, Mar 26, 2015 at 11:24 PM, Andra Lungu <
> > > > > lungu.andra@gmail.com
> > > > > > >
> > > > > > >>> > wrote:
> > > > > > >>> >
> > > > > > >>> > > Sure,
> > > > > > >>> > >
> > > > > > >>> > > 3470 [main] INFO
> > > > > > org.apache.flink.runtime.taskmanager.TaskManager  -
> > > > > > >>> > > Using 820 MB for Flink managed memory.
> > > > > > >>> > >
> > > > > > >>> > > On Thu, Mar 26, 2015 at 4:48 PM, Robert Metzger <
> > > > > > rmetzger@apache.org
> > > > > > >>> >
> > > > > > >>> > > wrote:
> > > > > > >>> > >
> > > > > > >>> > >> Hi,
> > > > > > >>> > >>
> > > > > > >>> > >> during startup, Flink will log something like:
> > > > > > >>> > >> 16:48:09,669 INFO
> > > > > > org.apache.flink.runtime.taskmanager.TaskManager
> > > > > > >>> > >>      - Using 1193 MB for Flink managed memory.
> > > > > > >>> > >>
> > > > > > >>> > >> Can you tell us how much memory Flink is managing in
> your
> > > > case?
> > > > > > >>> > >>
> > > > > > >>> > >>
> > > > > > >>> > >>
> > > > > > >>> > >> On Thu, Mar 26, 2015 at 4:46 PM, Andra Lungu <
> > > > > > lungu.andra@gmail.com
> > > > > > >>> >
> > > > > > >>> > >> wrote:
> > > > > > >>> > >>
> > > > > > >>> > >> > Hello everyone,
> > > > > > >>> > >> >
> > > > > > >>> > >> > I guess I need to revive this old discussion:
> > > > > > >>> > >> >
> > > > > > >>> > >> >
> > > > > > >>> > >>
> > > > > > >>> >
> > > > > > >>>
> > > > > >
> > > > >
> > > >
> > >
> >
> http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Memory-segment-error-when-migrating-functional-code-from-Flink-0-9-to-0-8-td3687.html
> > > > > > >>> > >> >
> > > > > > >>> > >> > At that point, the fix was to kindly ask Alex to make
> > his
> > > > > > project
> > > > > > >>> work
> > > > > > >>> > >> with
> > > > > > >>> > >> > 0.9.
> > > > > > >>> > >> >
> > > > > > >>> > >> > Now, I am not that lucky!
> > > > > > >>> > >> >
> > > > > > >>> > >> > This is the code:
> > > > > > >>> > >> >
> > > > > > https://github.com/andralungu/gelly-partitioning/tree/alphaSplit
> > > > > > >>> > >> >
> > > > > > >>> > >> > The main program(NodeSplitting) is working nicely, I
> get
> > > the
> > > > > > >>> correct
> > > > > > >>> > >> > result. But if you run the test,  you will see that
> > > > collection
> > > > > > >>> works
> > > > > > >>> > and
> > > > > > >>> > >> > cluster fails miserably with this exception:
> > > > > > >>> > >> >
> > > > > > >>> > >> > Caused by: java.lang.Exception: The data preparation
> for
> > > > task
> > > > > > >>> > >> 'Join(Join at
> > > > > > >>> > >> > weighEdges(NodeSplitting.java:112))
> > > > > > >>> > (04e172e761148a65783a4363406e08c0)'
> > > > > > >>> > >> ,
> > > > > > >>> > >> > caused an error: Too few memory segments provided.
> Hash
> > > Join
> > > > > > >>> needs at
> > > > > > >>> > >> least
> > > > > > >>> > >> > 33 memory segments.
> > > > > > >>> > >> >     at
> > > > > > >>> > >> >
> > > > > > >>> > >> >
> > > > > > >>> > >>
> > > > > > >>> >
> > > > > > >>>
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471)
> > > > > > >>> > >> >     at
> > > > > > >>> > >> >
> > > > > > >>> > >> >
> > > > > > >>> > >>
> > > > > > >>> >
> > > > > > >>>
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> > > > > > >>> > >> >     at
> > > > > > >>> > >> >
> > > > > > >>> > >> >
> > > > > > >>> > >>
> > > > > > >>> >
> > > > > > >>>
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:209)
> > > > > > >>> > >> >     at java.lang.Thread.run(Thread.java:745)
> > > > > > >>> > >> > Caused by: java.lang.IllegalArgumentException: Too few
> > > > memory
> > > > > > >>> segments
> > > > > > >>> > >> > provided. Hash Join needs at least 33 memory segments.
> > > > > > >>> > >> >
> > > > > > >>> > >> > I am running locally, from IntelliJ, on a tiny graph.
> > > > > > >>> > >> > $ cat /proc/meminfo
> > > > > > >>> > >> > MemTotal:       11405696 kB
> > > > > > >>> > >> > MemFree:         5586012 kB
> > > > > > >>> > >> > Buffers:          178100 kB
> > > > > > >>> > >> >
> > > > > > >>> > >> > I am sure I did not run out of memory...
> > > > > > >>> > >> >
> > > > > > >>> > >> > Any thoughts on this?
> > > > > > >>> > >> >
> > > > > > >>> > >> > Thanks!
> > > > > > >>> > >> > Andra
> > > > > > >>> > >> >
> > > > > > >>> > >>
> > > > > > >>> > >
> > > > > > >>> > >
> > > > > > >>> >
> > > > > > >>>
> > > > > > >>
> > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Memory segment error

Posted by Fabian Hueske <fh...@gmail.com>.
Well, each combiner, reducer, join, coGroup, and solutionset needs a share
of memory (maps & filters don't).
In your case it was pretty much at the edge, the hash joins require 33
buffers and got 32. So one memory-consuming operator less might fix it.
I did not look in detail at the other job, but it did not seem so much more
complex than the other. As said before, LOCs or total number of operators
are not the important thing here. It's the number of memory consumers.

I am not sure how hard the 80MB limit is. Maybe it is possible to increase
that a bit.

2015-03-30 23:25 GMT+02:00 Andra Lungu <lu...@gmail.com>:

> Hi Fabian,
>
> I'll see what I can do :).
> I am just a bit shocked. If this set of coGroups and joins was too much for
> a test case, how come the following worked?
>
>
> https://github.com/andralungu/flink/commit/f60b022de056ac259459b68eee6ff0ae9993f0f8
>
> 400 lines of complex computations :) And I have an even bigger one for
> which the test also passed...
>
>
> On Mon, Mar 30, 2015 at 2:31 PM, Fabian Hueske <fh...@gmail.com> wrote:
>
> > Hi Andra,
> >
> > I found the cause for the exception. Your test case is simply too complex
> > for our testing environment.
> > We restrict the TM memory for testcases to 80MB in order to execute
> > multiple tests in parallel on Travis.
> > I counted the memory consumers in your job and got:
> >
> > - 2 Combine
> > - 4 GroupReduce
> > - 4 CoGroup
> > - 2 Joins
> > - 1 SolutionSet
> >
> > Those are quite a few memory consumers for 20MB per slot (4 slots per
> TM).
> >
> > Do you see a way to reduce the number of operators in your testcase,
> maybe
> > by splitting it in half?
> >
> > 2015-03-30 11:01 GMT+02:00 Andra Lungu <lu...@gmail.com>:
> >
> > > Sure,
> > >
> > > It was in the first mail but that was sent a while ago :)
> > >
> > > This is the code:
> > > https://github.com/andralungu/gelly-partitioning/tree/alphaSplit
> > > I also added the log4j file in case it helps!
> > >
> > > The error is totally reproducible. 2 out of 2 people got the same.
> > > Steps to reproduce:
> > > 1). Clone the code; switch to alphaSplit branch
> > > 2). Run CounDegreeITCase.java
> > >
> > > Hope we can get to the bottom of this! If you need something, just ask.
> > >
> > >
> > > On Mon, Mar 30, 2015 at 10:54 AM, Fabian Hueske <fh...@gmail.com>
> > wrote:
> > >
> > > > Hmm, that is really weird.
> > > > Can you point me to a branch in your repository and the test case
> that
> > > > gives the error?
> > > >
> > > > Then I have a look at it and try to figure out what's going wrong.
> > > >
> > > > Cheers, Fabian
> > > >
> > > > 2015-03-30 10:43 GMT+02:00 Andra Lungu <lu...@gmail.com>:
> > > >
> > > > > Hello,
> > > > >
> > > > > I went on and did some further debugging on this issue. Even though
> > the
> > > > > exception said that the problem comes from here:
> > > > > 4837 [Join(Join at* weighEdges(NodeSplitting.java:117)*) (1/4)]
> ERROR
> > > > > org.apache.flink.runtime.operators.RegularPactTask  - Error in task
> > > code:
> > > > > Join(Join at weighEdges(NodeSplitting.java:117)) (1/4)
> > > > > java.lang.Exception: The data preparation for task 'Join(Join at
> > > > > weighEdges(NodeSplitting.java:117))' , caused an error: Too few
> > memory
> > > > > segments provided. Hash Join needs at least 33 memory segments.
> > > > >     at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471)
> > > > >     at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> > > > >     at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)
> > > > >     at java.lang.Thread.run(Thread.java:745)
> > > > >
> > > > > which is basically a chain of two joins, schema that I have
> repeated
> > > > > several times, including in the getTriplets() method and it passed
> > > every
> > > > > time. I thought that this could not be right!
> > > > >
> > > > > So I picked each intermediate data set formed, printed it and
> added a
> > > > > System.exit(0) afterwards. The exception comes from this method:
> > > > > aggregatePartialValuesSplitVertices. Even though this computes the
> > > > correct
> > > > > result, it then throws the memory segment exception(!!!!!! Just for
> > the
> > > > > Cluster test - everything else works).
> > > > >
> > > > > The code in the function is:
> > > > >
> > > > > private static DataSet<Vertex<String, Long>>
> > > > > aggregatePartialValuesSplitVertices(DataSet<Vertex<String, Long>>
> > > > > resultedVertices) {
> > > > >
> > > > >    return resultedVertices.flatMap(new
> FlatMapFunction<Vertex<String,
> > > > > Long>, Vertex<String, Long>>() {
> > > > >
> > > > >       @Override
> > > > >       public void flatMap(Vertex<String, Long> vertex,
> > > > > Collector<Vertex<String, Long>> collector) throws Exception {
> > > > >          int pos = vertex.getId().indexOf("_");
> > > > >
> > > > >          // if there is a splitted vertex
> > > > >          if(pos > -1) {
> > > > >             collector.collect(new Vertex<String,
> > > > > Long>(vertex.getId().substring(0, pos), vertex.getValue()));
> > > > >          } else {
> > > > >             collector.collect(vertex);
> > > > >          }
> > > > >       }
> > > > >    }).groupBy(0).reduceGroup(new GroupReduceFunction<Vertex<String,
> > > > > Long>, Vertex<String, Long>>() {
> > > > >
> > > > >       @Override
> > > > >       public void reduce(Iterable<Vertex<String, Long>> iterable,
> > > > >                      Collector<Vertex<String, Long>> collector)
> > throws
> > > > > Exception {
> > > > >          long sum = 0;
> > > > >          Vertex<String, Long> vertex = new Vertex<String, Long>();
> > > > >
> > > > >          Iterator<Vertex<String, Long>> iterator =
> > iterable.iterator();
> > > > >          while (iterator.hasNext()) {
> > > > >             vertex = iterator.next();
> > > > >             sum += vertex.getValue();
> > > > >          }
> > > > >
> > > > >          collector.collect(new Vertex<String, Long>(vertex.getId(),
> > > > sum));
> > > > >       }
> > > > >    });
> > > > >
> > > > > To me, nothing seems out of the ordinary here. This is regular user
> > > code.
> > > > > And the behaviour in the end is definitely not the one expected.
> Any
> > > idea
> > > > > why this might be happening?
> > > > >
> > > > > Thanks!
> > > > > Andra
> > > > >
> > > > > On Fri, Mar 27, 2015 at 12:08 AM, Andra Lungu <
> lungu.andra@gmail.com
> > >
> > > > > wrote:
> > > > >
> > > > > > Opps! Sorry! Did not know the mailing list does not support
> > > attachments
> > > > > :)
> > > > > > https://gist.github.com/andralungu/fba36d77f79189daa183
> > > > > >
> > > > > > On Fri, Mar 27, 2015 at 12:02 AM, Andra Lungu <
> > lungu.andra@gmail.com
> > > >
> > > > > > wrote:
> > > > > >
> > > > > >> Hi Fabian,
> > > > > >>
> > > > > >> I uploaded a file with my execution plan.
> > > > > >>
> > > > > >> On Thu, Mar 26, 2015 at 11:50 PM, Fabian Hueske <
> > fhueske@gmail.com>
> > > > > >> wrote:
> > > > > >>
> > > > > >>> Hi Andra,
> > > > > >>>
> > > > > >>> the error is independent of the size of the data set. A
> HashTable
> > > > needs
> > > > > >>> at
> > > > > >>> least 33 memory pages to operate.
> > > > > >>> Since you have 820MB of managed memory and the size of a memory
> > > page
> > > > is
> > > > > >>> 32KB, there should be more than 25k pages available.
> > > > > >>>
> > > > > >>> Can you post the execution plan of the program you execute (
> > > > > >>> ExecutionEnvironment.getExecutionPlan() )?
> > > > > >>>
> > > > > >>> Best, Fabian
> > > > > >>>
> > > > > >>> 2015-03-26 23:31 GMT+01:00 Andra Lungu <lungu.andra@gmail.com
> >:
> > > > > >>>
> > > > > >>> > For 20 edges and 5 nodes, that should be more thank enough.
> > > > > >>> >
> > > > > >>> > On Thu, Mar 26, 2015 at 11:24 PM, Andra Lungu <
> > > > lungu.andra@gmail.com
> > > > > >
> > > > > >>> > wrote:
> > > > > >>> >
> > > > > >>> > > Sure,
> > > > > >>> > >
> > > > > >>> > > 3470 [main] INFO
> > > > > org.apache.flink.runtime.taskmanager.TaskManager  -
> > > > > >>> > > Using 820 MB for Flink managed memory.
> > > > > >>> > >
> > > > > >>> > > On Thu, Mar 26, 2015 at 4:48 PM, Robert Metzger <
> > > > > rmetzger@apache.org
> > > > > >>> >
> > > > > >>> > > wrote:
> > > > > >>> > >
> > > > > >>> > >> Hi,
> > > > > >>> > >>
> > > > > >>> > >> during startup, Flink will log something like:
> > > > > >>> > >> 16:48:09,669 INFO
> > > > > org.apache.flink.runtime.taskmanager.TaskManager
> > > > > >>> > >>      - Using 1193 MB for Flink managed memory.
> > > > > >>> > >>
> > > > > >>> > >> Can you tell us how much memory Flink is managing in your
> > > case?
> > > > > >>> > >>
> > > > > >>> > >>
> > > > > >>> > >>
> > > > > >>> > >> On Thu, Mar 26, 2015 at 4:46 PM, Andra Lungu <
> > > > > lungu.andra@gmail.com
> > > > > >>> >
> > > > > >>> > >> wrote:
> > > > > >>> > >>
> > > > > >>> > >> > Hello everyone,
> > > > > >>> > >> >
> > > > > >>> > >> > I guess I need to revive this old discussion:
> > > > > >>> > >> >
> > > > > >>> > >> >
> > > > > >>> > >>
> > > > > >>> >
> > > > > >>>
> > > > >
> > > >
> > >
> >
> http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Memory-segment-error-when-migrating-functional-code-from-Flink-0-9-to-0-8-td3687.html
> > > > > >>> > >> >
> > > > > >>> > >> > At that point, the fix was to kindly ask Alex to make
> his
> > > > > project
> > > > > >>> work
> > > > > >>> > >> with
> > > > > >>> > >> > 0.9.
> > > > > >>> > >> >
> > > > > >>> > >> > Now, I am not that lucky!
> > > > > >>> > >> >
> > > > > >>> > >> > This is the code:
> > > > > >>> > >> >
> > > > > https://github.com/andralungu/gelly-partitioning/tree/alphaSplit
> > > > > >>> > >> >
> > > > > >>> > >> > The main program(NodeSplitting) is working nicely, I get
> > the
> > > > > >>> correct
> > > > > >>> > >> > result. But if you run the test,  you will see that
> > > collection
> > > > > >>> works
> > > > > >>> > and
> > > > > >>> > >> > cluster fails miserably with this exception:
> > > > > >>> > >> >
> > > > > >>> > >> > Caused by: java.lang.Exception: The data preparation for
> > > task
> > > > > >>> > >> 'Join(Join at
> > > > > >>> > >> > weighEdges(NodeSplitting.java:112))
> > > > > >>> > (04e172e761148a65783a4363406e08c0)'
> > > > > >>> > >> ,
> > > > > >>> > >> > caused an error: Too few memory segments provided. Hash
> > Join
> > > > > >>> needs at
> > > > > >>> > >> least
> > > > > >>> > >> > 33 memory segments.
> > > > > >>> > >> >     at
> > > > > >>> > >> >
> > > > > >>> > >> >
> > > > > >>> > >>
> > > > > >>> >
> > > > > >>>
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471)
> > > > > >>> > >> >     at
> > > > > >>> > >> >
> > > > > >>> > >> >
> > > > > >>> > >>
> > > > > >>> >
> > > > > >>>
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> > > > > >>> > >> >     at
> > > > > >>> > >> >
> > > > > >>> > >> >
> > > > > >>> > >>
> > > > > >>> >
> > > > > >>>
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:209)
> > > > > >>> > >> >     at java.lang.Thread.run(Thread.java:745)
> > > > > >>> > >> > Caused by: java.lang.IllegalArgumentException: Too few
> > > memory
> > > > > >>> segments
> > > > > >>> > >> > provided. Hash Join needs at least 33 memory segments.
> > > > > >>> > >> >
> > > > > >>> > >> > I am running locally, from IntelliJ, on a tiny graph.
> > > > > >>> > >> > $ cat /proc/meminfo
> > > > > >>> > >> > MemTotal:       11405696 kB
> > > > > >>> > >> > MemFree:         5586012 kB
> > > > > >>> > >> > Buffers:          178100 kB
> > > > > >>> > >> >
> > > > > >>> > >> > I am sure I did not run out of memory...
> > > > > >>> > >> >
> > > > > >>> > >> > Any thoughts on this?
> > > > > >>> > >> >
> > > > > >>> > >> > Thanks!
> > > > > >>> > >> > Andra
> > > > > >>> > >> >
> > > > > >>> > >>
> > > > > >>> > >
> > > > > >>> > >
> > > > > >>> >
> > > > > >>>
> > > > > >>
> > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Memory segment error

Posted by Andra Lungu <lu...@gmail.com>.
Hi Fabian,

I'll see what I can do :).
I am just a bit shocked. If this set of coGroups and joins was too much for
a test case, how come the following worked?

https://github.com/andralungu/flink/commit/f60b022de056ac259459b68eee6ff0ae9993f0f8

400 lines of complex computations :) And I have an even bigger one for
which the test also passed...


On Mon, Mar 30, 2015 at 2:31 PM, Fabian Hueske <fh...@gmail.com> wrote:

> Hi Andra,
>
> I found the cause for the exception. Your test case is simply too complex
> for our testing environment.
> We restrict the TM memory for testcases to 80MB in order to execute
> multiple tests in parallel on Travis.
> I counted the memory consumers in your job and got:
>
> - 2 Combine
> - 4 GroupReduce
> - 4 CoGroup
> - 2 Joins
> - 1 SolutionSet
>
> Those are quite a few memory consumers for 20MB per slot (4 slots per TM).
>
> Do you see a way to reduce the number of operators in your testcase, maybe
> by splitting it in half?
>
> 2015-03-30 11:01 GMT+02:00 Andra Lungu <lu...@gmail.com>:
>
> > Sure,
> >
> > It was in the first mail but that was sent a while ago :)
> >
> > This is the code:
> > https://github.com/andralungu/gelly-partitioning/tree/alphaSplit
> > I also added the log4j file in case it helps!
> >
> > The error is totally reproducible. 2 out of 2 people got the same.
> > Steps to reproduce:
> > 1). Clone the code; switch to alphaSplit branch
> > 2). Run CounDegreeITCase.java
> >
> > Hope we can get to the bottom of this! If you need something, just ask.
> >
> >
> > On Mon, Mar 30, 2015 at 10:54 AM, Fabian Hueske <fh...@gmail.com>
> wrote:
> >
> > > Hmm, that is really weird.
> > > Can you point me to a branch in your repository and the test case that
> > > gives the error?
> > >
> > > Then I have a look at it and try to figure out what's going wrong.
> > >
> > > Cheers, Fabian
> > >
> > > 2015-03-30 10:43 GMT+02:00 Andra Lungu <lu...@gmail.com>:
> > >
> > > > Hello,
> > > >
> > > > I went on and did some further debugging on this issue. Even though
> the
> > > > exception said that the problem comes from here:
> > > > 4837 [Join(Join at* weighEdges(NodeSplitting.java:117)*) (1/4)] ERROR
> > > > org.apache.flink.runtime.operators.RegularPactTask  - Error in task
> > code:
> > > > Join(Join at weighEdges(NodeSplitting.java:117)) (1/4)
> > > > java.lang.Exception: The data preparation for task 'Join(Join at
> > > > weighEdges(NodeSplitting.java:117))' , caused an error: Too few
> memory
> > > > segments provided. Hash Join needs at least 33 memory segments.
> > > >     at
> > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471)
> > > >     at
> > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> > > >     at
> > > >
> > > >
> > >
> >
> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)
> > > >     at java.lang.Thread.run(Thread.java:745)
> > > >
> > > > which is basically a chain of two joins, schema that I have repeated
> > > > several times, including in the getTriplets() method and it passed
> > every
> > > > time. I thought that this could not be right!
> > > >
> > > > So I picked each intermediate data set formed, printed it and added a
> > > > System.exit(0) afterwards. The exception comes from this method:
> > > > aggregatePartialValuesSplitVertices. Even though this computes the
> > > correct
> > > > result, it then throws the memory segment exception(!!!!!! Just for
> the
> > > > Cluster test - everything else works).
> > > >
> > > > The code in the function is:
> > > >
> > > > private static DataSet<Vertex<String, Long>>
> > > > aggregatePartialValuesSplitVertices(DataSet<Vertex<String, Long>>
> > > > resultedVertices) {
> > > >
> > > >    return resultedVertices.flatMap(new FlatMapFunction<Vertex<String,
> > > > Long>, Vertex<String, Long>>() {
> > > >
> > > >       @Override
> > > >       public void flatMap(Vertex<String, Long> vertex,
> > > > Collector<Vertex<String, Long>> collector) throws Exception {
> > > >          int pos = vertex.getId().indexOf("_");
> > > >
> > > >          // if there is a splitted vertex
> > > >          if(pos > -1) {
> > > >             collector.collect(new Vertex<String,
> > > > Long>(vertex.getId().substring(0, pos), vertex.getValue()));
> > > >          } else {
> > > >             collector.collect(vertex);
> > > >          }
> > > >       }
> > > >    }).groupBy(0).reduceGroup(new GroupReduceFunction<Vertex<String,
> > > > Long>, Vertex<String, Long>>() {
> > > >
> > > >       @Override
> > > >       public void reduce(Iterable<Vertex<String, Long>> iterable,
> > > >                      Collector<Vertex<String, Long>> collector)
> throws
> > > > Exception {
> > > >          long sum = 0;
> > > >          Vertex<String, Long> vertex = new Vertex<String, Long>();
> > > >
> > > >          Iterator<Vertex<String, Long>> iterator =
> iterable.iterator();
> > > >          while (iterator.hasNext()) {
> > > >             vertex = iterator.next();
> > > >             sum += vertex.getValue();
> > > >          }
> > > >
> > > >          collector.collect(new Vertex<String, Long>(vertex.getId(),
> > > sum));
> > > >       }
> > > >    });
> > > >
> > > > To me, nothing seems out of the ordinary here. This is regular user
> > code.
> > > > And the behaviour in the end is definitely not the one expected. Any
> > idea
> > > > why this might be happening?
> > > >
> > > > Thanks!
> > > > Andra
> > > >
> > > > On Fri, Mar 27, 2015 at 12:08 AM, Andra Lungu <lungu.andra@gmail.com
> >
> > > > wrote:
> > > >
> > > > > Opps! Sorry! Did not know the mailing list does not support
> > attachments
> > > > :)
> > > > > https://gist.github.com/andralungu/fba36d77f79189daa183
> > > > >
> > > > > On Fri, Mar 27, 2015 at 12:02 AM, Andra Lungu <
> lungu.andra@gmail.com
> > >
> > > > > wrote:
> > > > >
> > > > >> Hi Fabian,
> > > > >>
> > > > >> I uploaded a file with my execution plan.
> > > > >>
> > > > >> On Thu, Mar 26, 2015 at 11:50 PM, Fabian Hueske <
> fhueske@gmail.com>
> > > > >> wrote:
> > > > >>
> > > > >>> Hi Andra,
> > > > >>>
> > > > >>> the error is independent of the size of the data set. A HashTable
> > > needs
> > > > >>> at
> > > > >>> least 33 memory pages to operate.
> > > > >>> Since you have 820MB of managed memory and the size of a memory
> > page
> > > is
> > > > >>> 32KB, there should be more than 25k pages available.
> > > > >>>
> > > > >>> Can you post the execution plan of the program you execute (
> > > > >>> ExecutionEnvironment.getExecutionPlan() )?
> > > > >>>
> > > > >>> Best, Fabian
> > > > >>>
> > > > >>> 2015-03-26 23:31 GMT+01:00 Andra Lungu <lu...@gmail.com>:
> > > > >>>
> > > > >>> > For 20 edges and 5 nodes, that should be more thank enough.
> > > > >>> >
> > > > >>> > On Thu, Mar 26, 2015 at 11:24 PM, Andra Lungu <
> > > lungu.andra@gmail.com
> > > > >
> > > > >>> > wrote:
> > > > >>> >
> > > > >>> > > Sure,
> > > > >>> > >
> > > > >>> > > 3470 [main] INFO
> > > > org.apache.flink.runtime.taskmanager.TaskManager  -
> > > > >>> > > Using 820 MB for Flink managed memory.
> > > > >>> > >
> > > > >>> > > On Thu, Mar 26, 2015 at 4:48 PM, Robert Metzger <
> > > > rmetzger@apache.org
> > > > >>> >
> > > > >>> > > wrote:
> > > > >>> > >
> > > > >>> > >> Hi,
> > > > >>> > >>
> > > > >>> > >> during startup, Flink will log something like:
> > > > >>> > >> 16:48:09,669 INFO
> > > > org.apache.flink.runtime.taskmanager.TaskManager
> > > > >>> > >>      - Using 1193 MB for Flink managed memory.
> > > > >>> > >>
> > > > >>> > >> Can you tell us how much memory Flink is managing in your
> > case?
> > > > >>> > >>
> > > > >>> > >>
> > > > >>> > >>
> > > > >>> > >> On Thu, Mar 26, 2015 at 4:46 PM, Andra Lungu <
> > > > lungu.andra@gmail.com
> > > > >>> >
> > > > >>> > >> wrote:
> > > > >>> > >>
> > > > >>> > >> > Hello everyone,
> > > > >>> > >> >
> > > > >>> > >> > I guess I need to revive this old discussion:
> > > > >>> > >> >
> > > > >>> > >> >
> > > > >>> > >>
> > > > >>> >
> > > > >>>
> > > >
> > >
> >
> http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Memory-segment-error-when-migrating-functional-code-from-Flink-0-9-to-0-8-td3687.html
> > > > >>> > >> >
> > > > >>> > >> > At that point, the fix was to kindly ask Alex to make his
> > > > project
> > > > >>> work
> > > > >>> > >> with
> > > > >>> > >> > 0.9.
> > > > >>> > >> >
> > > > >>> > >> > Now, I am not that lucky!
> > > > >>> > >> >
> > > > >>> > >> > This is the code:
> > > > >>> > >> >
> > > > https://github.com/andralungu/gelly-partitioning/tree/alphaSplit
> > > > >>> > >> >
> > > > >>> > >> > The main program(NodeSplitting) is working nicely, I get
> the
> > > > >>> correct
> > > > >>> > >> > result. But if you run the test,  you will see that
> > collection
> > > > >>> works
> > > > >>> > and
> > > > >>> > >> > cluster fails miserably with this exception:
> > > > >>> > >> >
> > > > >>> > >> > Caused by: java.lang.Exception: The data preparation for
> > task
> > > > >>> > >> 'Join(Join at
> > > > >>> > >> > weighEdges(NodeSplitting.java:112))
> > > > >>> > (04e172e761148a65783a4363406e08c0)'
> > > > >>> > >> ,
> > > > >>> > >> > caused an error: Too few memory segments provided. Hash
> Join
> > > > >>> needs at
> > > > >>> > >> least
> > > > >>> > >> > 33 memory segments.
> > > > >>> > >> >     at
> > > > >>> > >> >
> > > > >>> > >> >
> > > > >>> > >>
> > > > >>> >
> > > > >>>
> > > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471)
> > > > >>> > >> >     at
> > > > >>> > >> >
> > > > >>> > >> >
> > > > >>> > >>
> > > > >>> >
> > > > >>>
> > > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> > > > >>> > >> >     at
> > > > >>> > >> >
> > > > >>> > >> >
> > > > >>> > >>
> > > > >>> >
> > > > >>>
> > > >
> > >
> >
> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:209)
> > > > >>> > >> >     at java.lang.Thread.run(Thread.java:745)
> > > > >>> > >> > Caused by: java.lang.IllegalArgumentException: Too few
> > memory
> > > > >>> segments
> > > > >>> > >> > provided. Hash Join needs at least 33 memory segments.
> > > > >>> > >> >
> > > > >>> > >> > I am running locally, from IntelliJ, on a tiny graph.
> > > > >>> > >> > $ cat /proc/meminfo
> > > > >>> > >> > MemTotal:       11405696 kB
> > > > >>> > >> > MemFree:         5586012 kB
> > > > >>> > >> > Buffers:          178100 kB
> > > > >>> > >> >
> > > > >>> > >> > I am sure I did not run out of memory...
> > > > >>> > >> >
> > > > >>> > >> > Any thoughts on this?
> > > > >>> > >> >
> > > > >>> > >> > Thanks!
> > > > >>> > >> > Andra
> > > > >>> > >> >
> > > > >>> > >>
> > > > >>> > >
> > > > >>> > >
> > > > >>> >
> > > > >>>
> > > > >>
> > > > >>
> > > > >
> > > >
> > >
> >
>

Re: Memory segment error

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Andra,

I found the cause for the exception. Your test case is simply too complex
for our testing environment.
We restrict the TM memory for testcases to 80MB in order to execute
multiple tests in parallel on Travis.
I counted the memory consumers in your job and got:

- 2 Combine
- 4 GroupReduce
- 4 CoGroup
- 2 Joins
- 1 SolutionSet

Those are quite a few memory consumers for 20MB per slot (4 slots per TM).

Do you see a way to reduce the number of operators in your testcase, maybe
by splitting it in half?

2015-03-30 11:01 GMT+02:00 Andra Lungu <lu...@gmail.com>:

> Sure,
>
> It was in the first mail but that was sent a while ago :)
>
> This is the code:
> https://github.com/andralungu/gelly-partitioning/tree/alphaSplit
> I also added the log4j file in case it helps!
>
> The error is totally reproducible. 2 out of 2 people got the same.
> Steps to reproduce:
> 1). Clone the code; switch to alphaSplit branch
> 2). Run CounDegreeITCase.java
>
> Hope we can get to the bottom of this! If you need something, just ask.
>
>
> On Mon, Mar 30, 2015 at 10:54 AM, Fabian Hueske <fh...@gmail.com> wrote:
>
> > Hmm, that is really weird.
> > Can you point me to a branch in your repository and the test case that
> > gives the error?
> >
> > Then I have a look at it and try to figure out what's going wrong.
> >
> > Cheers, Fabian
> >
> > 2015-03-30 10:43 GMT+02:00 Andra Lungu <lu...@gmail.com>:
> >
> > > Hello,
> > >
> > > I went on and did some further debugging on this issue. Even though the
> > > exception said that the problem comes from here:
> > > 4837 [Join(Join at* weighEdges(NodeSplitting.java:117)*) (1/4)] ERROR
> > > org.apache.flink.runtime.operators.RegularPactTask  - Error in task
> code:
> > > Join(Join at weighEdges(NodeSplitting.java:117)) (1/4)
> > > java.lang.Exception: The data preparation for task 'Join(Join at
> > > weighEdges(NodeSplitting.java:117))' , caused an error: Too few memory
> > > segments provided. Hash Join needs at least 33 memory segments.
> > >     at
> > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471)
> > >     at
> > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> > >     at
> > >
> > >
> >
> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)
> > >     at java.lang.Thread.run(Thread.java:745)
> > >
> > > which is basically a chain of two joins, schema that I have repeated
> > > several times, including in the getTriplets() method and it passed
> every
> > > time. I thought that this could not be right!
> > >
> > > So I picked each intermediate data set formed, printed it and added a
> > > System.exit(0) afterwards. The exception comes from this method:
> > > aggregatePartialValuesSplitVertices. Even though this computes the
> > correct
> > > result, it then throws the memory segment exception(!!!!!! Just for the
> > > Cluster test - everything else works).
> > >
> > > The code in the function is:
> > >
> > > private static DataSet<Vertex<String, Long>>
> > > aggregatePartialValuesSplitVertices(DataSet<Vertex<String, Long>>
> > > resultedVertices) {
> > >
> > >    return resultedVertices.flatMap(new FlatMapFunction<Vertex<String,
> > > Long>, Vertex<String, Long>>() {
> > >
> > >       @Override
> > >       public void flatMap(Vertex<String, Long> vertex,
> > > Collector<Vertex<String, Long>> collector) throws Exception {
> > >          int pos = vertex.getId().indexOf("_");
> > >
> > >          // if there is a splitted vertex
> > >          if(pos > -1) {
> > >             collector.collect(new Vertex<String,
> > > Long>(vertex.getId().substring(0, pos), vertex.getValue()));
> > >          } else {
> > >             collector.collect(vertex);
> > >          }
> > >       }
> > >    }).groupBy(0).reduceGroup(new GroupReduceFunction<Vertex<String,
> > > Long>, Vertex<String, Long>>() {
> > >
> > >       @Override
> > >       public void reduce(Iterable<Vertex<String, Long>> iterable,
> > >                      Collector<Vertex<String, Long>> collector) throws
> > > Exception {
> > >          long sum = 0;
> > >          Vertex<String, Long> vertex = new Vertex<String, Long>();
> > >
> > >          Iterator<Vertex<String, Long>> iterator = iterable.iterator();
> > >          while (iterator.hasNext()) {
> > >             vertex = iterator.next();
> > >             sum += vertex.getValue();
> > >          }
> > >
> > >          collector.collect(new Vertex<String, Long>(vertex.getId(),
> > sum));
> > >       }
> > >    });
> > >
> > > To me, nothing seems out of the ordinary here. This is regular user
> code.
> > > And the behaviour in the end is definitely not the one expected. Any
> idea
> > > why this might be happening?
> > >
> > > Thanks!
> > > Andra
> > >
> > > On Fri, Mar 27, 2015 at 12:08 AM, Andra Lungu <lu...@gmail.com>
> > > wrote:
> > >
> > > > Opps! Sorry! Did not know the mailing list does not support
> attachments
> > > :)
> > > > https://gist.github.com/andralungu/fba36d77f79189daa183
> > > >
> > > > On Fri, Mar 27, 2015 at 12:02 AM, Andra Lungu <lungu.andra@gmail.com
> >
> > > > wrote:
> > > >
> > > >> Hi Fabian,
> > > >>
> > > >> I uploaded a file with my execution plan.
> > > >>
> > > >> On Thu, Mar 26, 2015 at 11:50 PM, Fabian Hueske <fh...@gmail.com>
> > > >> wrote:
> > > >>
> > > >>> Hi Andra,
> > > >>>
> > > >>> the error is independent of the size of the data set. A HashTable
> > needs
> > > >>> at
> > > >>> least 33 memory pages to operate.
> > > >>> Since you have 820MB of managed memory and the size of a memory
> page
> > is
> > > >>> 32KB, there should be more than 25k pages available.
> > > >>>
> > > >>> Can you post the execution plan of the program you execute (
> > > >>> ExecutionEnvironment.getExecutionPlan() )?
> > > >>>
> > > >>> Best, Fabian
> > > >>>
> > > >>> 2015-03-26 23:31 GMT+01:00 Andra Lungu <lu...@gmail.com>:
> > > >>>
> > > >>> > For 20 edges and 5 nodes, that should be more thank enough.
> > > >>> >
> > > >>> > On Thu, Mar 26, 2015 at 11:24 PM, Andra Lungu <
> > lungu.andra@gmail.com
> > > >
> > > >>> > wrote:
> > > >>> >
> > > >>> > > Sure,
> > > >>> > >
> > > >>> > > 3470 [main] INFO
> > > org.apache.flink.runtime.taskmanager.TaskManager  -
> > > >>> > > Using 820 MB for Flink managed memory.
> > > >>> > >
> > > >>> > > On Thu, Mar 26, 2015 at 4:48 PM, Robert Metzger <
> > > rmetzger@apache.org
> > > >>> >
> > > >>> > > wrote:
> > > >>> > >
> > > >>> > >> Hi,
> > > >>> > >>
> > > >>> > >> during startup, Flink will log something like:
> > > >>> > >> 16:48:09,669 INFO
> > > org.apache.flink.runtime.taskmanager.TaskManager
> > > >>> > >>      - Using 1193 MB for Flink managed memory.
> > > >>> > >>
> > > >>> > >> Can you tell us how much memory Flink is managing in your
> case?
> > > >>> > >>
> > > >>> > >>
> > > >>> > >>
> > > >>> > >> On Thu, Mar 26, 2015 at 4:46 PM, Andra Lungu <
> > > lungu.andra@gmail.com
> > > >>> >
> > > >>> > >> wrote:
> > > >>> > >>
> > > >>> > >> > Hello everyone,
> > > >>> > >> >
> > > >>> > >> > I guess I need to revive this old discussion:
> > > >>> > >> >
> > > >>> > >> >
> > > >>> > >>
> > > >>> >
> > > >>>
> > >
> >
> http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Memory-segment-error-when-migrating-functional-code-from-Flink-0-9-to-0-8-td3687.html
> > > >>> > >> >
> > > >>> > >> > At that point, the fix was to kindly ask Alex to make his
> > > project
> > > >>> work
> > > >>> > >> with
> > > >>> > >> > 0.9.
> > > >>> > >> >
> > > >>> > >> > Now, I am not that lucky!
> > > >>> > >> >
> > > >>> > >> > This is the code:
> > > >>> > >> >
> > > https://github.com/andralungu/gelly-partitioning/tree/alphaSplit
> > > >>> > >> >
> > > >>> > >> > The main program(NodeSplitting) is working nicely, I get the
> > > >>> correct
> > > >>> > >> > result. But if you run the test,  you will see that
> collection
> > > >>> works
> > > >>> > and
> > > >>> > >> > cluster fails miserably with this exception:
> > > >>> > >> >
> > > >>> > >> > Caused by: java.lang.Exception: The data preparation for
> task
> > > >>> > >> 'Join(Join at
> > > >>> > >> > weighEdges(NodeSplitting.java:112))
> > > >>> > (04e172e761148a65783a4363406e08c0)'
> > > >>> > >> ,
> > > >>> > >> > caused an error: Too few memory segments provided. Hash Join
> > > >>> needs at
> > > >>> > >> least
> > > >>> > >> > 33 memory segments.
> > > >>> > >> >     at
> > > >>> > >> >
> > > >>> > >> >
> > > >>> > >>
> > > >>> >
> > > >>>
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471)
> > > >>> > >> >     at
> > > >>> > >> >
> > > >>> > >> >
> > > >>> > >>
> > > >>> >
> > > >>>
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> > > >>> > >> >     at
> > > >>> > >> >
> > > >>> > >> >
> > > >>> > >>
> > > >>> >
> > > >>>
> > >
> >
> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:209)
> > > >>> > >> >     at java.lang.Thread.run(Thread.java:745)
> > > >>> > >> > Caused by: java.lang.IllegalArgumentException: Too few
> memory
> > > >>> segments
> > > >>> > >> > provided. Hash Join needs at least 33 memory segments.
> > > >>> > >> >
> > > >>> > >> > I am running locally, from IntelliJ, on a tiny graph.
> > > >>> > >> > $ cat /proc/meminfo
> > > >>> > >> > MemTotal:       11405696 kB
> > > >>> > >> > MemFree:         5586012 kB
> > > >>> > >> > Buffers:          178100 kB
> > > >>> > >> >
> > > >>> > >> > I am sure I did not run out of memory...
> > > >>> > >> >
> > > >>> > >> > Any thoughts on this?
> > > >>> > >> >
> > > >>> > >> > Thanks!
> > > >>> > >> > Andra
> > > >>> > >> >
> > > >>> > >>
> > > >>> > >
> > > >>> > >
> > > >>> >
> > > >>>
> > > >>
> > > >>
> > > >
> > >
> >
>

Re: Memory segment error

Posted by Andra Lungu <lu...@gmail.com>.
Sure,

It was in the first mail but that was sent a while ago :)

This is the code:
https://github.com/andralungu/gelly-partitioning/tree/alphaSplit
I also added the log4j file in case it helps!

The error is totally reproducible. 2 out of 2 people got the same.
Steps to reproduce:
1). Clone the code; switch to alphaSplit branch
2). Run CounDegreeITCase.java

Hope we can get to the bottom of this! If you need something, just ask.


On Mon, Mar 30, 2015 at 10:54 AM, Fabian Hueske <fh...@gmail.com> wrote:

> Hmm, that is really weird.
> Can you point me to a branch in your repository and the test case that
> gives the error?
>
> Then I have a look at it and try to figure out what's going wrong.
>
> Cheers, Fabian
>
> 2015-03-30 10:43 GMT+02:00 Andra Lungu <lu...@gmail.com>:
>
> > Hello,
> >
> > I went on and did some further debugging on this issue. Even though the
> > exception said that the problem comes from here:
> > 4837 [Join(Join at* weighEdges(NodeSplitting.java:117)*) (1/4)] ERROR
> > org.apache.flink.runtime.operators.RegularPactTask  - Error in task code:
> > Join(Join at weighEdges(NodeSplitting.java:117)) (1/4)
> > java.lang.Exception: The data preparation for task 'Join(Join at
> > weighEdges(NodeSplitting.java:117))' , caused an error: Too few memory
> > segments provided. Hash Join needs at least 33 memory segments.
> >     at
> >
> >
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471)
> >     at
> >
> >
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> >     at
> >
> >
> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)
> >     at java.lang.Thread.run(Thread.java:745)
> >
> > which is basically a chain of two joins, schema that I have repeated
> > several times, including in the getTriplets() method and it passed every
> > time. I thought that this could not be right!
> >
> > So I picked each intermediate data set formed, printed it and added a
> > System.exit(0) afterwards. The exception comes from this method:
> > aggregatePartialValuesSplitVertices. Even though this computes the
> correct
> > result, it then throws the memory segment exception(!!!!!! Just for the
> > Cluster test - everything else works).
> >
> > The code in the function is:
> >
> > private static DataSet<Vertex<String, Long>>
> > aggregatePartialValuesSplitVertices(DataSet<Vertex<String, Long>>
> > resultedVertices) {
> >
> >    return resultedVertices.flatMap(new FlatMapFunction<Vertex<String,
> > Long>, Vertex<String, Long>>() {
> >
> >       @Override
> >       public void flatMap(Vertex<String, Long> vertex,
> > Collector<Vertex<String, Long>> collector) throws Exception {
> >          int pos = vertex.getId().indexOf("_");
> >
> >          // if there is a splitted vertex
> >          if(pos > -1) {
> >             collector.collect(new Vertex<String,
> > Long>(vertex.getId().substring(0, pos), vertex.getValue()));
> >          } else {
> >             collector.collect(vertex);
> >          }
> >       }
> >    }).groupBy(0).reduceGroup(new GroupReduceFunction<Vertex<String,
> > Long>, Vertex<String, Long>>() {
> >
> >       @Override
> >       public void reduce(Iterable<Vertex<String, Long>> iterable,
> >                      Collector<Vertex<String, Long>> collector) throws
> > Exception {
> >          long sum = 0;
> >          Vertex<String, Long> vertex = new Vertex<String, Long>();
> >
> >          Iterator<Vertex<String, Long>> iterator = iterable.iterator();
> >          while (iterator.hasNext()) {
> >             vertex = iterator.next();
> >             sum += vertex.getValue();
> >          }
> >
> >          collector.collect(new Vertex<String, Long>(vertex.getId(),
> sum));
> >       }
> >    });
> >
> > To me, nothing seems out of the ordinary here. This is regular user code.
> > And the behaviour in the end is definitely not the one expected. Any idea
> > why this might be happening?
> >
> > Thanks!
> > Andra
> >
> > On Fri, Mar 27, 2015 at 12:08 AM, Andra Lungu <lu...@gmail.com>
> > wrote:
> >
> > > Opps! Sorry! Did not know the mailing list does not support attachments
> > :)
> > > https://gist.github.com/andralungu/fba36d77f79189daa183
> > >
> > > On Fri, Mar 27, 2015 at 12:02 AM, Andra Lungu <lu...@gmail.com>
> > > wrote:
> > >
> > >> Hi Fabian,
> > >>
> > >> I uploaded a file with my execution plan.
> > >>
> > >> On Thu, Mar 26, 2015 at 11:50 PM, Fabian Hueske <fh...@gmail.com>
> > >> wrote:
> > >>
> > >>> Hi Andra,
> > >>>
> > >>> the error is independent of the size of the data set. A HashTable
> needs
> > >>> at
> > >>> least 33 memory pages to operate.
> > >>> Since you have 820MB of managed memory and the size of a memory page
> is
> > >>> 32KB, there should be more than 25k pages available.
> > >>>
> > >>> Can you post the execution plan of the program you execute (
> > >>> ExecutionEnvironment.getExecutionPlan() )?
> > >>>
> > >>> Best, Fabian
> > >>>
> > >>> 2015-03-26 23:31 GMT+01:00 Andra Lungu <lu...@gmail.com>:
> > >>>
> > >>> > For 20 edges and 5 nodes, that should be more thank enough.
> > >>> >
> > >>> > On Thu, Mar 26, 2015 at 11:24 PM, Andra Lungu <
> lungu.andra@gmail.com
> > >
> > >>> > wrote:
> > >>> >
> > >>> > > Sure,
> > >>> > >
> > >>> > > 3470 [main] INFO
> > org.apache.flink.runtime.taskmanager.TaskManager  -
> > >>> > > Using 820 MB for Flink managed memory.
> > >>> > >
> > >>> > > On Thu, Mar 26, 2015 at 4:48 PM, Robert Metzger <
> > rmetzger@apache.org
> > >>> >
> > >>> > > wrote:
> > >>> > >
> > >>> > >> Hi,
> > >>> > >>
> > >>> > >> during startup, Flink will log something like:
> > >>> > >> 16:48:09,669 INFO
> > org.apache.flink.runtime.taskmanager.TaskManager
> > >>> > >>      - Using 1193 MB for Flink managed memory.
> > >>> > >>
> > >>> > >> Can you tell us how much memory Flink is managing in your case?
> > >>> > >>
> > >>> > >>
> > >>> > >>
> > >>> > >> On Thu, Mar 26, 2015 at 4:46 PM, Andra Lungu <
> > lungu.andra@gmail.com
> > >>> >
> > >>> > >> wrote:
> > >>> > >>
> > >>> > >> > Hello everyone,
> > >>> > >> >
> > >>> > >> > I guess I need to revive this old discussion:
> > >>> > >> >
> > >>> > >> >
> > >>> > >>
> > >>> >
> > >>>
> >
> http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Memory-segment-error-when-migrating-functional-code-from-Flink-0-9-to-0-8-td3687.html
> > >>> > >> >
> > >>> > >> > At that point, the fix was to kindly ask Alex to make his
> > project
> > >>> work
> > >>> > >> with
> > >>> > >> > 0.9.
> > >>> > >> >
> > >>> > >> > Now, I am not that lucky!
> > >>> > >> >
> > >>> > >> > This is the code:
> > >>> > >> >
> > https://github.com/andralungu/gelly-partitioning/tree/alphaSplit
> > >>> > >> >
> > >>> > >> > The main program(NodeSplitting) is working nicely, I get the
> > >>> correct
> > >>> > >> > result. But if you run the test,  you will see that collection
> > >>> works
> > >>> > and
> > >>> > >> > cluster fails miserably with this exception:
> > >>> > >> >
> > >>> > >> > Caused by: java.lang.Exception: The data preparation for task
> > >>> > >> 'Join(Join at
> > >>> > >> > weighEdges(NodeSplitting.java:112))
> > >>> > (04e172e761148a65783a4363406e08c0)'
> > >>> > >> ,
> > >>> > >> > caused an error: Too few memory segments provided. Hash Join
> > >>> needs at
> > >>> > >> least
> > >>> > >> > 33 memory segments.
> > >>> > >> >     at
> > >>> > >> >
> > >>> > >> >
> > >>> > >>
> > >>> >
> > >>>
> >
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471)
> > >>> > >> >     at
> > >>> > >> >
> > >>> > >> >
> > >>> > >>
> > >>> >
> > >>>
> >
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> > >>> > >> >     at
> > >>> > >> >
> > >>> > >> >
> > >>> > >>
> > >>> >
> > >>>
> >
> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:209)
> > >>> > >> >     at java.lang.Thread.run(Thread.java:745)
> > >>> > >> > Caused by: java.lang.IllegalArgumentException: Too few memory
> > >>> segments
> > >>> > >> > provided. Hash Join needs at least 33 memory segments.
> > >>> > >> >
> > >>> > >> > I am running locally, from IntelliJ, on a tiny graph.
> > >>> > >> > $ cat /proc/meminfo
> > >>> > >> > MemTotal:       11405696 kB
> > >>> > >> > MemFree:         5586012 kB
> > >>> > >> > Buffers:          178100 kB
> > >>> > >> >
> > >>> > >> > I am sure I did not run out of memory...
> > >>> > >> >
> > >>> > >> > Any thoughts on this?
> > >>> > >> >
> > >>> > >> > Thanks!
> > >>> > >> > Andra
> > >>> > >> >
> > >>> > >>
> > >>> > >
> > >>> > >
> > >>> >
> > >>>
> > >>
> > >>
> > >
> >
>

Re: Memory segment error

Posted by Fabian Hueske <fh...@gmail.com>.
Hmm, that is really weird.
Can you point me to a branch in your repository and the test case that
gives the error?

Then I have a look at it and try to figure out what's going wrong.

Cheers, Fabian

2015-03-30 10:43 GMT+02:00 Andra Lungu <lu...@gmail.com>:

> Hello,
>
> I went on and did some further debugging on this issue. Even though the
> exception said that the problem comes from here:
> 4837 [Join(Join at* weighEdges(NodeSplitting.java:117)*) (1/4)] ERROR
> org.apache.flink.runtime.operators.RegularPactTask  - Error in task code:
> Join(Join at weighEdges(NodeSplitting.java:117)) (1/4)
> java.lang.Exception: The data preparation for task 'Join(Join at
> weighEdges(NodeSplitting.java:117))' , caused an error: Too few memory
> segments provided. Hash Join needs at least 33 memory segments.
>     at
>
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471)
>     at
>
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
>     at
>
> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)
>     at java.lang.Thread.run(Thread.java:745)
>
> which is basically a chain of two joins, schema that I have repeated
> several times, including in the getTriplets() method and it passed every
> time. I thought that this could not be right!
>
> So I picked each intermediate data set formed, printed it and added a
> System.exit(0) afterwards. The exception comes from this method:
> aggregatePartialValuesSplitVertices. Even though this computes the correct
> result, it then throws the memory segment exception(!!!!!! Just for the
> Cluster test - everything else works).
>
> The code in the function is:
>
> private static DataSet<Vertex<String, Long>>
> aggregatePartialValuesSplitVertices(DataSet<Vertex<String, Long>>
> resultedVertices) {
>
>    return resultedVertices.flatMap(new FlatMapFunction<Vertex<String,
> Long>, Vertex<String, Long>>() {
>
>       @Override
>       public void flatMap(Vertex<String, Long> vertex,
> Collector<Vertex<String, Long>> collector) throws Exception {
>          int pos = vertex.getId().indexOf("_");
>
>          // if there is a splitted vertex
>          if(pos > -1) {
>             collector.collect(new Vertex<String,
> Long>(vertex.getId().substring(0, pos), vertex.getValue()));
>          } else {
>             collector.collect(vertex);
>          }
>       }
>    }).groupBy(0).reduceGroup(new GroupReduceFunction<Vertex<String,
> Long>, Vertex<String, Long>>() {
>
>       @Override
>       public void reduce(Iterable<Vertex<String, Long>> iterable,
>                      Collector<Vertex<String, Long>> collector) throws
> Exception {
>          long sum = 0;
>          Vertex<String, Long> vertex = new Vertex<String, Long>();
>
>          Iterator<Vertex<String, Long>> iterator = iterable.iterator();
>          while (iterator.hasNext()) {
>             vertex = iterator.next();
>             sum += vertex.getValue();
>          }
>
>          collector.collect(new Vertex<String, Long>(vertex.getId(), sum));
>       }
>    });
>
> To me, nothing seems out of the ordinary here. This is regular user code.
> And the behaviour in the end is definitely not the one expected. Any idea
> why this might be happening?
>
> Thanks!
> Andra
>
> On Fri, Mar 27, 2015 at 12:08 AM, Andra Lungu <lu...@gmail.com>
> wrote:
>
> > Opps! Sorry! Did not know the mailing list does not support attachments
> :)
> > https://gist.github.com/andralungu/fba36d77f79189daa183
> >
> > On Fri, Mar 27, 2015 at 12:02 AM, Andra Lungu <lu...@gmail.com>
> > wrote:
> >
> >> Hi Fabian,
> >>
> >> I uploaded a file with my execution plan.
> >>
> >> On Thu, Mar 26, 2015 at 11:50 PM, Fabian Hueske <fh...@gmail.com>
> >> wrote:
> >>
> >>> Hi Andra,
> >>>
> >>> the error is independent of the size of the data set. A HashTable needs
> >>> at
> >>> least 33 memory pages to operate.
> >>> Since you have 820MB of managed memory and the size of a memory page is
> >>> 32KB, there should be more than 25k pages available.
> >>>
> >>> Can you post the execution plan of the program you execute (
> >>> ExecutionEnvironment.getExecutionPlan() )?
> >>>
> >>> Best, Fabian
> >>>
> >>> 2015-03-26 23:31 GMT+01:00 Andra Lungu <lu...@gmail.com>:
> >>>
> >>> > For 20 edges and 5 nodes, that should be more thank enough.
> >>> >
> >>> > On Thu, Mar 26, 2015 at 11:24 PM, Andra Lungu <lungu.andra@gmail.com
> >
> >>> > wrote:
> >>> >
> >>> > > Sure,
> >>> > >
> >>> > > 3470 [main] INFO
> org.apache.flink.runtime.taskmanager.TaskManager  -
> >>> > > Using 820 MB for Flink managed memory.
> >>> > >
> >>> > > On Thu, Mar 26, 2015 at 4:48 PM, Robert Metzger <
> rmetzger@apache.org
> >>> >
> >>> > > wrote:
> >>> > >
> >>> > >> Hi,
> >>> > >>
> >>> > >> during startup, Flink will log something like:
> >>> > >> 16:48:09,669 INFO
> org.apache.flink.runtime.taskmanager.TaskManager
> >>> > >>      - Using 1193 MB for Flink managed memory.
> >>> > >>
> >>> > >> Can you tell us how much memory Flink is managing in your case?
> >>> > >>
> >>> > >>
> >>> > >>
> >>> > >> On Thu, Mar 26, 2015 at 4:46 PM, Andra Lungu <
> lungu.andra@gmail.com
> >>> >
> >>> > >> wrote:
> >>> > >>
> >>> > >> > Hello everyone,
> >>> > >> >
> >>> > >> > I guess I need to revive this old discussion:
> >>> > >> >
> >>> > >> >
> >>> > >>
> >>> >
> >>>
> http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Memory-segment-error-when-migrating-functional-code-from-Flink-0-9-to-0-8-td3687.html
> >>> > >> >
> >>> > >> > At that point, the fix was to kindly ask Alex to make his
> project
> >>> work
> >>> > >> with
> >>> > >> > 0.9.
> >>> > >> >
> >>> > >> > Now, I am not that lucky!
> >>> > >> >
> >>> > >> > This is the code:
> >>> > >> >
> https://github.com/andralungu/gelly-partitioning/tree/alphaSplit
> >>> > >> >
> >>> > >> > The main program(NodeSplitting) is working nicely, I get the
> >>> correct
> >>> > >> > result. But if you run the test,  you will see that collection
> >>> works
> >>> > and
> >>> > >> > cluster fails miserably with this exception:
> >>> > >> >
> >>> > >> > Caused by: java.lang.Exception: The data preparation for task
> >>> > >> 'Join(Join at
> >>> > >> > weighEdges(NodeSplitting.java:112))
> >>> > (04e172e761148a65783a4363406e08c0)'
> >>> > >> ,
> >>> > >> > caused an error: Too few memory segments provided. Hash Join
> >>> needs at
> >>> > >> least
> >>> > >> > 33 memory segments.
> >>> > >> >     at
> >>> > >> >
> >>> > >> >
> >>> > >>
> >>> >
> >>>
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471)
> >>> > >> >     at
> >>> > >> >
> >>> > >> >
> >>> > >>
> >>> >
> >>>
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> >>> > >> >     at
> >>> > >> >
> >>> > >> >
> >>> > >>
> >>> >
> >>>
> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:209)
> >>> > >> >     at java.lang.Thread.run(Thread.java:745)
> >>> > >> > Caused by: java.lang.IllegalArgumentException: Too few memory
> >>> segments
> >>> > >> > provided. Hash Join needs at least 33 memory segments.
> >>> > >> >
> >>> > >> > I am running locally, from IntelliJ, on a tiny graph.
> >>> > >> > $ cat /proc/meminfo
> >>> > >> > MemTotal:       11405696 kB
> >>> > >> > MemFree:         5586012 kB
> >>> > >> > Buffers:          178100 kB
> >>> > >> >
> >>> > >> > I am sure I did not run out of memory...
> >>> > >> >
> >>> > >> > Any thoughts on this?
> >>> > >> >
> >>> > >> > Thanks!
> >>> > >> > Andra
> >>> > >> >
> >>> > >>
> >>> > >
> >>> > >
> >>> >
> >>>
> >>
> >>
> >
>

Re: Memory segment error

Posted by Andra Lungu <lu...@gmail.com>.
Hello,

I went on and did some further debugging on this issue. Even though the
exception said that the problem comes from here:
4837 [Join(Join at* weighEdges(NodeSplitting.java:117)*) (1/4)] ERROR
org.apache.flink.runtime.operators.RegularPactTask  - Error in task code:
Join(Join at weighEdges(NodeSplitting.java:117)) (1/4)
java.lang.Exception: The data preparation for task 'Join(Join at
weighEdges(NodeSplitting.java:117))' , caused an error: Too few memory
segments provided. Hash Join needs at least 33 memory segments.
    at
org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471)
    at
org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
    at
org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)
    at java.lang.Thread.run(Thread.java:745)

which is basically a chain of two joins, schema that I have repeated
several times, including in the getTriplets() method and it passed every
time. I thought that this could not be right!

So I picked each intermediate data set formed, printed it and added a
System.exit(0) afterwards. The exception comes from this method:
aggregatePartialValuesSplitVertices. Even though this computes the correct
result, it then throws the memory segment exception(!!!!!! Just for the
Cluster test - everything else works).

The code in the function is:

private static DataSet<Vertex<String, Long>>
aggregatePartialValuesSplitVertices(DataSet<Vertex<String, Long>>
resultedVertices) {

   return resultedVertices.flatMap(new FlatMapFunction<Vertex<String,
Long>, Vertex<String, Long>>() {

      @Override
      public void flatMap(Vertex<String, Long> vertex,
Collector<Vertex<String, Long>> collector) throws Exception {
         int pos = vertex.getId().indexOf("_");

         // if there is a splitted vertex
         if(pos > -1) {
            collector.collect(new Vertex<String,
Long>(vertex.getId().substring(0, pos), vertex.getValue()));
         } else {
            collector.collect(vertex);
         }
      }
   }).groupBy(0).reduceGroup(new GroupReduceFunction<Vertex<String,
Long>, Vertex<String, Long>>() {

      @Override
      public void reduce(Iterable<Vertex<String, Long>> iterable,
                     Collector<Vertex<String, Long>> collector) throws
Exception {
         long sum = 0;
         Vertex<String, Long> vertex = new Vertex<String, Long>();

         Iterator<Vertex<String, Long>> iterator = iterable.iterator();
         while (iterator.hasNext()) {
            vertex = iterator.next();
            sum += vertex.getValue();
         }

         collector.collect(new Vertex<String, Long>(vertex.getId(), sum));
      }
   });

To me, nothing seems out of the ordinary here. This is regular user code.
And the behaviour in the end is definitely not the one expected. Any idea
why this might be happening?

Thanks!
Andra

On Fri, Mar 27, 2015 at 12:08 AM, Andra Lungu <lu...@gmail.com> wrote:

> Opps! Sorry! Did not know the mailing list does not support attachments :)
> https://gist.github.com/andralungu/fba36d77f79189daa183
>
> On Fri, Mar 27, 2015 at 12:02 AM, Andra Lungu <lu...@gmail.com>
> wrote:
>
>> Hi Fabian,
>>
>> I uploaded a file with my execution plan.
>>
>> On Thu, Mar 26, 2015 at 11:50 PM, Fabian Hueske <fh...@gmail.com>
>> wrote:
>>
>>> Hi Andra,
>>>
>>> the error is independent of the size of the data set. A HashTable needs
>>> at
>>> least 33 memory pages to operate.
>>> Since you have 820MB of managed memory and the size of a memory page is
>>> 32KB, there should be more than 25k pages available.
>>>
>>> Can you post the execution plan of the program you execute (
>>> ExecutionEnvironment.getExecutionPlan() )?
>>>
>>> Best, Fabian
>>>
>>> 2015-03-26 23:31 GMT+01:00 Andra Lungu <lu...@gmail.com>:
>>>
>>> > For 20 edges and 5 nodes, that should be more thank enough.
>>> >
>>> > On Thu, Mar 26, 2015 at 11:24 PM, Andra Lungu <lu...@gmail.com>
>>> > wrote:
>>> >
>>> > > Sure,
>>> > >
>>> > > 3470 [main] INFO  org.apache.flink.runtime.taskmanager.TaskManager  -
>>> > > Using 820 MB for Flink managed memory.
>>> > >
>>> > > On Thu, Mar 26, 2015 at 4:48 PM, Robert Metzger <rmetzger@apache.org
>>> >
>>> > > wrote:
>>> > >
>>> > >> Hi,
>>> > >>
>>> > >> during startup, Flink will log something like:
>>> > >> 16:48:09,669 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>>> > >>      - Using 1193 MB for Flink managed memory.
>>> > >>
>>> > >> Can you tell us how much memory Flink is managing in your case?
>>> > >>
>>> > >>
>>> > >>
>>> > >> On Thu, Mar 26, 2015 at 4:46 PM, Andra Lungu <lungu.andra@gmail.com
>>> >
>>> > >> wrote:
>>> > >>
>>> > >> > Hello everyone,
>>> > >> >
>>> > >> > I guess I need to revive this old discussion:
>>> > >> >
>>> > >> >
>>> > >>
>>> >
>>> http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Memory-segment-error-when-migrating-functional-code-from-Flink-0-9-to-0-8-td3687.html
>>> > >> >
>>> > >> > At that point, the fix was to kindly ask Alex to make his project
>>> work
>>> > >> with
>>> > >> > 0.9.
>>> > >> >
>>> > >> > Now, I am not that lucky!
>>> > >> >
>>> > >> > This is the code:
>>> > >> > https://github.com/andralungu/gelly-partitioning/tree/alphaSplit
>>> > >> >
>>> > >> > The main program(NodeSplitting) is working nicely, I get the
>>> correct
>>> > >> > result. But if you run the test,  you will see that collection
>>> works
>>> > and
>>> > >> > cluster fails miserably with this exception:
>>> > >> >
>>> > >> > Caused by: java.lang.Exception: The data preparation for task
>>> > >> 'Join(Join at
>>> > >> > weighEdges(NodeSplitting.java:112))
>>> > (04e172e761148a65783a4363406e08c0)'
>>> > >> ,
>>> > >> > caused an error: Too few memory segments provided. Hash Join
>>> needs at
>>> > >> least
>>> > >> > 33 memory segments.
>>> > >> >     at
>>> > >> >
>>> > >> >
>>> > >>
>>> >
>>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471)
>>> > >> >     at
>>> > >> >
>>> > >> >
>>> > >>
>>> >
>>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
>>> > >> >     at
>>> > >> >
>>> > >> >
>>> > >>
>>> >
>>> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:209)
>>> > >> >     at java.lang.Thread.run(Thread.java:745)
>>> > >> > Caused by: java.lang.IllegalArgumentException: Too few memory
>>> segments
>>> > >> > provided. Hash Join needs at least 33 memory segments.
>>> > >> >
>>> > >> > I am running locally, from IntelliJ, on a tiny graph.
>>> > >> > $ cat /proc/meminfo
>>> > >> > MemTotal:       11405696 kB
>>> > >> > MemFree:         5586012 kB
>>> > >> > Buffers:          178100 kB
>>> > >> >
>>> > >> > I am sure I did not run out of memory...
>>> > >> >
>>> > >> > Any thoughts on this?
>>> > >> >
>>> > >> > Thanks!
>>> > >> > Andra
>>> > >> >
>>> > >>
>>> > >
>>> > >
>>> >
>>>
>>
>>
>

Re: Memory segment error

Posted by Andra Lungu <lu...@gmail.com>.
Opps! Sorry! Did not know the mailing list does not support attachments :)
https://gist.github.com/andralungu/fba36d77f79189daa183

On Fri, Mar 27, 2015 at 12:02 AM, Andra Lungu <lu...@gmail.com> wrote:

> Hi Fabian,
>
> I uploaded a file with my execution plan.
>
> On Thu, Mar 26, 2015 at 11:50 PM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi Andra,
>>
>> the error is independent of the size of the data set. A HashTable needs at
>> least 33 memory pages to operate.
>> Since you have 820MB of managed memory and the size of a memory page is
>> 32KB, there should be more than 25k pages available.
>>
>> Can you post the execution plan of the program you execute (
>> ExecutionEnvironment.getExecutionPlan() )?
>>
>> Best, Fabian
>>
>> 2015-03-26 23:31 GMT+01:00 Andra Lungu <lu...@gmail.com>:
>>
>> > For 20 edges and 5 nodes, that should be more thank enough.
>> >
>> > On Thu, Mar 26, 2015 at 11:24 PM, Andra Lungu <lu...@gmail.com>
>> > wrote:
>> >
>> > > Sure,
>> > >
>> > > 3470 [main] INFO  org.apache.flink.runtime.taskmanager.TaskManager  -
>> > > Using 820 MB for Flink managed memory.
>> > >
>> > > On Thu, Mar 26, 2015 at 4:48 PM, Robert Metzger <rm...@apache.org>
>> > > wrote:
>> > >
>> > >> Hi,
>> > >>
>> > >> during startup, Flink will log something like:
>> > >> 16:48:09,669 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>> > >>      - Using 1193 MB for Flink managed memory.
>> > >>
>> > >> Can you tell us how much memory Flink is managing in your case?
>> > >>
>> > >>
>> > >>
>> > >> On Thu, Mar 26, 2015 at 4:46 PM, Andra Lungu <lu...@gmail.com>
>> > >> wrote:
>> > >>
>> > >> > Hello everyone,
>> > >> >
>> > >> > I guess I need to revive this old discussion:
>> > >> >
>> > >> >
>> > >>
>> >
>> http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Memory-segment-error-when-migrating-functional-code-from-Flink-0-9-to-0-8-td3687.html
>> > >> >
>> > >> > At that point, the fix was to kindly ask Alex to make his project
>> work
>> > >> with
>> > >> > 0.9.
>> > >> >
>> > >> > Now, I am not that lucky!
>> > >> >
>> > >> > This is the code:
>> > >> > https://github.com/andralungu/gelly-partitioning/tree/alphaSplit
>> > >> >
>> > >> > The main program(NodeSplitting) is working nicely, I get the
>> correct
>> > >> > result. But if you run the test,  you will see that collection
>> works
>> > and
>> > >> > cluster fails miserably with this exception:
>> > >> >
>> > >> > Caused by: java.lang.Exception: The data preparation for task
>> > >> 'Join(Join at
>> > >> > weighEdges(NodeSplitting.java:112))
>> > (04e172e761148a65783a4363406e08c0)'
>> > >> ,
>> > >> > caused an error: Too few memory segments provided. Hash Join needs
>> at
>> > >> least
>> > >> > 33 memory segments.
>> > >> >     at
>> > >> >
>> > >> >
>> > >>
>> >
>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471)
>> > >> >     at
>> > >> >
>> > >> >
>> > >>
>> >
>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
>> > >> >     at
>> > >> >
>> > >> >
>> > >>
>> >
>> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:209)
>> > >> >     at java.lang.Thread.run(Thread.java:745)
>> > >> > Caused by: java.lang.IllegalArgumentException: Too few memory
>> segments
>> > >> > provided. Hash Join needs at least 33 memory segments.
>> > >> >
>> > >> > I am running locally, from IntelliJ, on a tiny graph.
>> > >> > $ cat /proc/meminfo
>> > >> > MemTotal:       11405696 kB
>> > >> > MemFree:         5586012 kB
>> > >> > Buffers:          178100 kB
>> > >> >
>> > >> > I am sure I did not run out of memory...
>> > >> >
>> > >> > Any thoughts on this?
>> > >> >
>> > >> > Thanks!
>> > >> > Andra
>> > >> >
>> > >>
>> > >
>> > >
>> >
>>
>
>

Re: Memory segment error

Posted by Andra Lungu <lu...@gmail.com>.
Hi Fabian,

I uploaded a file with my execution plan.

On Thu, Mar 26, 2015 at 11:50 PM, Fabian Hueske <fh...@gmail.com> wrote:

> Hi Andra,
>
> the error is independent of the size of the data set. A HashTable needs at
> least 33 memory pages to operate.
> Since you have 820MB of managed memory and the size of a memory page is
> 32KB, there should be more than 25k pages available.
>
> Can you post the execution plan of the program you execute (
> ExecutionEnvironment.getExecutionPlan() )?
>
> Best, Fabian
>
> 2015-03-26 23:31 GMT+01:00 Andra Lungu <lu...@gmail.com>:
>
> > For 20 edges and 5 nodes, that should be more thank enough.
> >
> > On Thu, Mar 26, 2015 at 11:24 PM, Andra Lungu <lu...@gmail.com>
> > wrote:
> >
> > > Sure,
> > >
> > > 3470 [main] INFO  org.apache.flink.runtime.taskmanager.TaskManager  -
> > > Using 820 MB for Flink managed memory.
> > >
> > > On Thu, Mar 26, 2015 at 4:48 PM, Robert Metzger <rm...@apache.org>
> > > wrote:
> > >
> > >> Hi,
> > >>
> > >> during startup, Flink will log something like:
> > >> 16:48:09,669 INFO  org.apache.flink.runtime.taskmanager.TaskManager
> > >>      - Using 1193 MB for Flink managed memory.
> > >>
> > >> Can you tell us how much memory Flink is managing in your case?
> > >>
> > >>
> > >>
> > >> On Thu, Mar 26, 2015 at 4:46 PM, Andra Lungu <lu...@gmail.com>
> > >> wrote:
> > >>
> > >> > Hello everyone,
> > >> >
> > >> > I guess I need to revive this old discussion:
> > >> >
> > >> >
> > >>
> >
> http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Memory-segment-error-when-migrating-functional-code-from-Flink-0-9-to-0-8-td3687.html
> > >> >
> > >> > At that point, the fix was to kindly ask Alex to make his project
> work
> > >> with
> > >> > 0.9.
> > >> >
> > >> > Now, I am not that lucky!
> > >> >
> > >> > This is the code:
> > >> > https://github.com/andralungu/gelly-partitioning/tree/alphaSplit
> > >> >
> > >> > The main program(NodeSplitting) is working nicely, I get the correct
> > >> > result. But if you run the test,  you will see that collection works
> > and
> > >> > cluster fails miserably with this exception:
> > >> >
> > >> > Caused by: java.lang.Exception: The data preparation for task
> > >> 'Join(Join at
> > >> > weighEdges(NodeSplitting.java:112))
> > (04e172e761148a65783a4363406e08c0)'
> > >> ,
> > >> > caused an error: Too few memory segments provided. Hash Join needs
> at
> > >> least
> > >> > 33 memory segments.
> > >> >     at
> > >> >
> > >> >
> > >>
> >
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471)
> > >> >     at
> > >> >
> > >> >
> > >>
> >
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> > >> >     at
> > >> >
> > >> >
> > >>
> >
> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:209)
> > >> >     at java.lang.Thread.run(Thread.java:745)
> > >> > Caused by: java.lang.IllegalArgumentException: Too few memory
> segments
> > >> > provided. Hash Join needs at least 33 memory segments.
> > >> >
> > >> > I am running locally, from IntelliJ, on a tiny graph.
> > >> > $ cat /proc/meminfo
> > >> > MemTotal:       11405696 kB
> > >> > MemFree:         5586012 kB
> > >> > Buffers:          178100 kB
> > >> >
> > >> > I am sure I did not run out of memory...
> > >> >
> > >> > Any thoughts on this?
> > >> >
> > >> > Thanks!
> > >> > Andra
> > >> >
> > >>
> > >
> > >
> >
>

Re: Memory segment error

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Andra,

the error is independent of the size of the data set. A HashTable needs at
least 33 memory pages to operate.
Since you have 820MB of managed memory and the size of a memory page is
32KB, there should be more than 25k pages available.

Can you post the execution plan of the program you execute (
ExecutionEnvironment.getExecutionPlan() )?

Best, Fabian

2015-03-26 23:31 GMT+01:00 Andra Lungu <lu...@gmail.com>:

> For 20 edges and 5 nodes, that should be more thank enough.
>
> On Thu, Mar 26, 2015 at 11:24 PM, Andra Lungu <lu...@gmail.com>
> wrote:
>
> > Sure,
> >
> > 3470 [main] INFO  org.apache.flink.runtime.taskmanager.TaskManager  -
> > Using 820 MB for Flink managed memory.
> >
> > On Thu, Mar 26, 2015 at 4:48 PM, Robert Metzger <rm...@apache.org>
> > wrote:
> >
> >> Hi,
> >>
> >> during startup, Flink will log something like:
> >> 16:48:09,669 INFO  org.apache.flink.runtime.taskmanager.TaskManager
> >>      - Using 1193 MB for Flink managed memory.
> >>
> >> Can you tell us how much memory Flink is managing in your case?
> >>
> >>
> >>
> >> On Thu, Mar 26, 2015 at 4:46 PM, Andra Lungu <lu...@gmail.com>
> >> wrote:
> >>
> >> > Hello everyone,
> >> >
> >> > I guess I need to revive this old discussion:
> >> >
> >> >
> >>
> http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Memory-segment-error-when-migrating-functional-code-from-Flink-0-9-to-0-8-td3687.html
> >> >
> >> > At that point, the fix was to kindly ask Alex to make his project work
> >> with
> >> > 0.9.
> >> >
> >> > Now, I am not that lucky!
> >> >
> >> > This is the code:
> >> > https://github.com/andralungu/gelly-partitioning/tree/alphaSplit
> >> >
> >> > The main program(NodeSplitting) is working nicely, I get the correct
> >> > result. But if you run the test,  you will see that collection works
> and
> >> > cluster fails miserably with this exception:
> >> >
> >> > Caused by: java.lang.Exception: The data preparation for task
> >> 'Join(Join at
> >> > weighEdges(NodeSplitting.java:112))
> (04e172e761148a65783a4363406e08c0)'
> >> ,
> >> > caused an error: Too few memory segments provided. Hash Join needs at
> >> least
> >> > 33 memory segments.
> >> >     at
> >> >
> >> >
> >>
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471)
> >> >     at
> >> >
> >> >
> >>
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> >> >     at
> >> >
> >> >
> >>
> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:209)
> >> >     at java.lang.Thread.run(Thread.java:745)
> >> > Caused by: java.lang.IllegalArgumentException: Too few memory segments
> >> > provided. Hash Join needs at least 33 memory segments.
> >> >
> >> > I am running locally, from IntelliJ, on a tiny graph.
> >> > $ cat /proc/meminfo
> >> > MemTotal:       11405696 kB
> >> > MemFree:         5586012 kB
> >> > Buffers:          178100 kB
> >> >
> >> > I am sure I did not run out of memory...
> >> >
> >> > Any thoughts on this?
> >> >
> >> > Thanks!
> >> > Andra
> >> >
> >>
> >
> >
>

Re: Memory segment error

Posted by Andra Lungu <lu...@gmail.com>.
For 20 edges and 5 nodes, that should be more thank enough.

On Thu, Mar 26, 2015 at 11:24 PM, Andra Lungu <lu...@gmail.com> wrote:

> Sure,
>
> 3470 [main] INFO  org.apache.flink.runtime.taskmanager.TaskManager  -
> Using 820 MB for Flink managed memory.
>
> On Thu, Mar 26, 2015 at 4:48 PM, Robert Metzger <rm...@apache.org>
> wrote:
>
>> Hi,
>>
>> during startup, Flink will log something like:
>> 16:48:09,669 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>>      - Using 1193 MB for Flink managed memory.
>>
>> Can you tell us how much memory Flink is managing in your case?
>>
>>
>>
>> On Thu, Mar 26, 2015 at 4:46 PM, Andra Lungu <lu...@gmail.com>
>> wrote:
>>
>> > Hello everyone,
>> >
>> > I guess I need to revive this old discussion:
>> >
>> >
>> http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Memory-segment-error-when-migrating-functional-code-from-Flink-0-9-to-0-8-td3687.html
>> >
>> > At that point, the fix was to kindly ask Alex to make his project work
>> with
>> > 0.9.
>> >
>> > Now, I am not that lucky!
>> >
>> > This is the code:
>> > https://github.com/andralungu/gelly-partitioning/tree/alphaSplit
>> >
>> > The main program(NodeSplitting) is working nicely, I get the correct
>> > result. But if you run the test,  you will see that collection works and
>> > cluster fails miserably with this exception:
>> >
>> > Caused by: java.lang.Exception: The data preparation for task
>> 'Join(Join at
>> > weighEdges(NodeSplitting.java:112)) (04e172e761148a65783a4363406e08c0)'
>> ,
>> > caused an error: Too few memory segments provided. Hash Join needs at
>> least
>> > 33 memory segments.
>> >     at
>> >
>> >
>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471)
>> >     at
>> >
>> >
>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
>> >     at
>> >
>> >
>> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:209)
>> >     at java.lang.Thread.run(Thread.java:745)
>> > Caused by: java.lang.IllegalArgumentException: Too few memory segments
>> > provided. Hash Join needs at least 33 memory segments.
>> >
>> > I am running locally, from IntelliJ, on a tiny graph.
>> > $ cat /proc/meminfo
>> > MemTotal:       11405696 kB
>> > MemFree:         5586012 kB
>> > Buffers:          178100 kB
>> >
>> > I am sure I did not run out of memory...
>> >
>> > Any thoughts on this?
>> >
>> > Thanks!
>> > Andra
>> >
>>
>
>

Re: Memory segment error

Posted by Andra Lungu <lu...@gmail.com>.
Sure,

3470 [main] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - Using
820 MB for Flink managed memory.

On Thu, Mar 26, 2015 at 4:48 PM, Robert Metzger <rm...@apache.org> wrote:

> Hi,
>
> during startup, Flink will log something like:
> 16:48:09,669 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>      - Using 1193 MB for Flink managed memory.
>
> Can you tell us how much memory Flink is managing in your case?
>
>
>
> On Thu, Mar 26, 2015 at 4:46 PM, Andra Lungu <lu...@gmail.com>
> wrote:
>
> > Hello everyone,
> >
> > I guess I need to revive this old discussion:
> >
> >
> http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Memory-segment-error-when-migrating-functional-code-from-Flink-0-9-to-0-8-td3687.html
> >
> > At that point, the fix was to kindly ask Alex to make his project work
> with
> > 0.9.
> >
> > Now, I am not that lucky!
> >
> > This is the code:
> > https://github.com/andralungu/gelly-partitioning/tree/alphaSplit
> >
> > The main program(NodeSplitting) is working nicely, I get the correct
> > result. But if you run the test,  you will see that collection works and
> > cluster fails miserably with this exception:
> >
> > Caused by: java.lang.Exception: The data preparation for task 'Join(Join
> at
> > weighEdges(NodeSplitting.java:112)) (04e172e761148a65783a4363406e08c0)' ,
> > caused an error: Too few memory segments provided. Hash Join needs at
> least
> > 33 memory segments.
> >     at
> >
> >
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471)
> >     at
> >
> >
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> >     at
> >
> >
> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:209)
> >     at java.lang.Thread.run(Thread.java:745)
> > Caused by: java.lang.IllegalArgumentException: Too few memory segments
> > provided. Hash Join needs at least 33 memory segments.
> >
> > I am running locally, from IntelliJ, on a tiny graph.
> > $ cat /proc/meminfo
> > MemTotal:       11405696 kB
> > MemFree:         5586012 kB
> > Buffers:          178100 kB
> >
> > I am sure I did not run out of memory...
> >
> > Any thoughts on this?
> >
> > Thanks!
> > Andra
> >
>

Re: Memory segment error

Posted by Robert Metzger <rm...@apache.org>.
Hi,

during startup, Flink will log something like:
16:48:09,669 INFO  org.apache.flink.runtime.taskmanager.TaskManager
     - Using 1193 MB for Flink managed memory.

Can you tell us how much memory Flink is managing in your case?



On Thu, Mar 26, 2015 at 4:46 PM, Andra Lungu <lu...@gmail.com> wrote:

> Hello everyone,
>
> I guess I need to revive this old discussion:
>
> http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Memory-segment-error-when-migrating-functional-code-from-Flink-0-9-to-0-8-td3687.html
>
> At that point, the fix was to kindly ask Alex to make his project work with
> 0.9.
>
> Now, I am not that lucky!
>
> This is the code:
> https://github.com/andralungu/gelly-partitioning/tree/alphaSplit
>
> The main program(NodeSplitting) is working nicely, I get the correct
> result. But if you run the test,  you will see that collection works and
> cluster fails miserably with this exception:
>
> Caused by: java.lang.Exception: The data preparation for task 'Join(Join at
> weighEdges(NodeSplitting.java:112)) (04e172e761148a65783a4363406e08c0)' ,
> caused an error: Too few memory segments provided. Hash Join needs at least
> 33 memory segments.
>     at
>
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471)
>     at
>
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
>     at
>
> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:209)
>     at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.IllegalArgumentException: Too few memory segments
> provided. Hash Join needs at least 33 memory segments.
>
> I am running locally, from IntelliJ, on a tiny graph.
> $ cat /proc/meminfo
> MemTotal:       11405696 kB
> MemFree:         5586012 kB
> Buffers:          178100 kB
>
> I am sure I did not run out of memory...
>
> Any thoughts on this?
>
> Thanks!
> Andra
>