You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Sebastian <ss...@googlemail.com> on 2015/02/20 11:30:50 UTC

OutOfMemory during serialization

Hi,

I get a strange out of memory error from the serialization code when I 
try to run the following program:

def compute(trackingGraphFile: String, domainIndexFile: String,
   outputPath: String) = {

implicit val env = ExecutionEnvironment.getExecutionEnvironment

val edges = GraphUtils.readEdges(trackingGraphFile)
val domains = GraphUtils.readVertices(domainIndexFile)

val domainsByCompany = DomainsByCompany.mapping
val companyEdges = edges.filter { edge =>
     domainsByCompany.contains(edge.src.toInt) }
   .map { edge => domainsByCompany(edge.src.toInt) -> edge.target.toInt }
   .distinct

val companyBitMaps = companyEdges.groupBy(0).reduceGroup {
     domainsByCompany: Iterator[(String,Int)] =>

     var company = ""
     val seenAt = new util.BitSet(42889800)

     for ((name, domain) <- domainsByCompany) {
       company = name
       seenAt.set(domain)
     }

     company -> seenAt
   }

   companyBitMaps.print()

   env.execute()

}


The error looks as follows:


2015-02-20 11:22:54 INFO  JobClient:345 - java.lang.OutOfMemoryError: 
Java heap space
	at 
org.apache.flink.runtime.io.network.serialization.DataOutputSerializer.resize(DataOutputSerializer.java:249)
	at 
org.apache.flink.runtime.io.network.serialization.DataOutputSerializer.write(DataOutputSerializer.java:93)
	at 
org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)
	at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)
	at com.esotericsoftware.kryo.io.Output.require(Output.java:142)
	at com.esotericsoftware.kryo.io.Output.writeBoolean(Output.java:613)
	at com.twitter.chill.java.BitSetSerializer.write(BitSetSerializer.java:42)
	at com.twitter.chill.java.BitSetSerializer.write(BitSetSerializer.java:29)
	at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
	at 
org.apache.flink.api.java.typeutils.runtime.KryoSerializer.serialize(KryoSerializer.java:155)
	at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:91)
	at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:30)
	at 
org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51)
	at 
org.apache.flink.runtime.io.network.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76)
	at 
org.apache.flink.runtime.io.network.api.RecordWriter.emit(RecordWriter.java:82)
	at 
org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:88)
	at 
org.apache.flink.api.scala.GroupedDataSet$$anon$2.reduce(GroupedDataSet.scala:262)
	at 
org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:124)
	at 
org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:493)
	at 
org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:360)
	at 
org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:257)
	at java.lang.Thread.run(Thread.java:745)

I run the job locally, giving 2GB of Ram to the VM. The code will 
produce less than 10 groups and the bitsets used internally should not 
be larger than a few megabytes.

Any tips on how to fix this?

Best,
Sebastian

PS: Still waiting for a reduceGroup that gives me the key ;)




Re: Optimizing degree of parallelism

Posted by Fabian Hueske <fh...@gmail.com>.
Just to clarify.
The pull request 410 does not optimize the degree of parallelism of
operators with respect to performance / gain.
It just sets the maximum possible parallelization for the current execution
environment.

Cheers, Fabian

2015-03-02 17:20 GMT+01:00 Max Michels <mx...@apache.org>:

> Hi!
>
> There is a pending pull request for this feature. If that is what you
> had in mind:
>
> https://github.com/apache/flink/pull/410
>
> Best regards,
> Max
>
> On Mon, Mar 2, 2015 at 5:11 PM, Alexander Alexandrov
> <al...@gmail.com> wrote:
> > AFAIK at the moment this is not supported but at the TU Berlin we have a
> > master student working on this feature, so it might be possible within
> the
> > next 3-6 months.
> >
> > Regards,
> > Alexander
> >
> > 2015-03-02 17:01 GMT+01:00 Malte Schwarzer <ms...@mieo.de>:
> >>
> >> Hi everyone,
> >>
> >> I read that Flink is supposed to automatically optimize the degree of
> >> parallelism. But I never saw any change of parallelism in the web
> interface
> >> without defining dop manually (-p parameter).
> >>
> >> Is there any of this optimization actually happening? Or how can I
> switch
> >> it on?
> >>
> >>
> >> Cheers
> >> Malte
> >
> >
>

Re: Optimizing degree of parallelism

Posted by Max Michels <mx...@apache.org>.
Hi!

There is a pending pull request for this feature. If that is what you
had in mind:

https://github.com/apache/flink/pull/410

Best regards,
Max

On Mon, Mar 2, 2015 at 5:11 PM, Alexander Alexandrov
<al...@gmail.com> wrote:
> AFAIK at the moment this is not supported but at the TU Berlin we have a
> master student working on this feature, so it might be possible within the
> next 3-6 months.
>
> Regards,
> Alexander
>
> 2015-03-02 17:01 GMT+01:00 Malte Schwarzer <ms...@mieo.de>:
>>
>> Hi everyone,
>>
>> I read that Flink is supposed to automatically optimize the degree of
>> parallelism. But I never saw any change of parallelism in the web interface
>> without defining dop manually (-p parameter).
>>
>> Is there any of this optimization actually happening? Or how can I switch
>> it on?
>>
>>
>> Cheers
>> Malte
>
>

Re: Optimizing degree of parallelism

Posted by Alexander Alexandrov <al...@gmail.com>.
AFAIK at the moment this is not supported but at the TU Berlin we have a
master student working on this feature, so it might be possible within the
next 3-6 months.

Regards,
Alexander

2015-03-02 17:01 GMT+01:00 Malte Schwarzer <ms...@mieo.de>:

> Hi everyone,
>
> I read that Flink is supposed to automatically optimize the degree of
> parallelism. But I never saw any change of parallelism in the web interface
> without defining dop manually (-p parameter).
>
> Is there any of this optimization actually happening? Or how can I switch
> it on?
>
>
> Cheers
> Malte
>

Optimizing degree of parallelism

Posted by Malte Schwarzer <ms...@mieo.de>.
Hi everyone,

I read that Flink is supposed to automatically optimize the degree of
parallelism. But I never saw any change of parallelism in the web interface
without defining dop manually (-p parameter).

Is there any of this optimization actually happening? Or how can I switch it
on?


Cheers
Malte



Re: OutOfMemory during serialization

Posted by Robert Metzger <rm...@apache.org>.
Twitter has merged my improved BitSetSerializer for Kryo:
https://github.com/twitter/chill/pull/220
Once they've released a new version, I'll update our twitter-chill
dependency.

On Fri, Feb 20, 2015 at 2:13 PM, Robert Metzger <rm...@apache.org> wrote:

> Lets create an issue in Flink to somehow fix the issue.
>
> Lets a) see if the new serializer registration in 0.9 allows users to
> replace the serializers if they had been already set by chill.
> and b) fix the issue in twitter/chill.
> I think we can ask them to release a new version with the fix (they seem
> to release quite often). Also, I made good experiences with contributing to
> twitter/chill.
>
> On Fri, Feb 20, 2015 at 2:02 PM, Ufuk Celebi <uc...@apache.org> wrote:
>
>> I've just looked into the BitSetSerializer of Chill. And it seems to be
>> true that each bit is encoded as a boolean (for all bit positions <=
>> "logical" length).
>>
>> Regarding the DataOutputSerializer: would help to catch OoM exceptions
>> during resize operations and rethrow it with a more detailed message (how
>> large the buffer is currently, new size after resize).
>>
>> On 20 Feb 2015, at 13:22, Stephan Ewen <se...@apache.org> wrote:
>>
>> > What happens (in the original stack trace) is the following: The
>> serializer starts producing the byte stream data and we buffer it, to
>> determine the length, before sending it over the network. While buffering
>> that data, the memory runs out.
>> >
>> > It may be that you are simply short of memory, it may also be that the
>> serializer (here the Kryo Chill BitsetSerializer) is simply extremely
>> inefficient in terms of space. It seems that it tries to write a boolean
>> (coded as one byte) per bit. That is blowing up your bitset quite a bit.
>> >
>> > A solution may also be to register a better bitset serializer. Chill's
>> default one seems to be sort of inefficient...
>> >
>> >
>> >
>> >
>> >
>> > On Fri, Feb 20, 2015 at 1:03 PM, Sebastian <ss...@googlemail.com>
>> wrote:
>> > I don't have a build unfortunately, I'm using the maven dependency.
>> I'll try to find a workaround. Thx for your help.
>> >
>> > -s
>> >
>> > On 20.02.2015 12:44, Robert Metzger wrote:
>> > Hey Sebastian,
>> >
>> > I've fixed the issue in this branch:
>> > https://github.com/rmetzger/flink/tree/flink1589:
>> >
>> > Configuration c =newConfiguration();
>> > c.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,0.5f);
>> > finalExecutionEnvironment env =
>> ExecutionEnvironment.createLocalEnvironment(c);
>> >
>> >
>> > I'll also backport the fix to the release-0.8 branch to make it
>> > available in the 0.8.2 release.
>> >
>> > Maybe you can easily cherry-pick the commit to your 0.8.1 Flink build.
>> >
>> >
>> > Best,
>> > Robert
>> >
>> > On Fri, Feb 20, 2015 at 12:11 PM, Robert Metzger <rmetzger@apache.org
>> > <ma...@apache.org>> wrote:
>> >
>> >     Hi Sebastian,
>> >
>> >     Looks like you've found a limitation of Flink.
>> >     I've already filed two JIRAs to resolve the issue
>> >     (https://issues.apache.org/jira/browse/FLINK-1588,
>> >     https://issues.apache.org/jira/browse/FLINK-1589).
>> >
>> >     I don't know your setup, when you use Flink just as a dependency
>> >     without a version being checked out, there is probably no way right
>> >     now to use change the configuration settings.
>> >     Then, you have to start yourself a local cluster
>> >     (./bin/start-local.sh (+ your settings in conf/flink-conf.yaml)).
>> >     You can then either submit your job with ./bin/flink or using the
>> >     RemoteExecutionEnvironment
>> (ExecutionEnvironment.createRemoteEnvironment()).
>> >
>> >     If you have the Flink source checked out, you can also hard-code the
>> >     configuration values into org.apache.flink.client.LocalExecutor.
>> >
>> >
>> >     By the way, Flink 0.8.1 is now available on maven central (I suspect
>> >     you had to build it yourself yesterday evening).
>> >     But given these issues here, it doesn't matter for you anymore ;)
>> >
>> >
>> >     Best,
>> >     Robert
>> >
>> >
>> >
>> >     On Fri, Feb 20, 2015 at 11:48 AM, Sebastian <
>> ssc.open@googlemail.com
>> >     <ma...@googlemail.com>> wrote:
>> >
>> >         I'm running flink from my IDE, how do change this setting in
>> >         that context?
>> >
>> >
>> >         On 20.02.2015 11:41, Fabian Hueske wrote:
>> >
>> >             Have you tried to increase the heap size by shrinking the
>> >             TM-managed memory?
>> >
>> >             Reduce the fraction (taskmanager.memory.fraction) or fix the
>> >             amount of TM memory (taskmanager.memory.size) in the
>> >             flink-config.yaml [1].
>> >
>> >             Cheers, Fabian
>> >
>> >             [1] http://flink.apache.org/docs/__0.8/config.html
>> >             <http://flink.apache.org/docs/0.8/config.html>
>> >
>> >
>> >                 On 20 Feb 2015, at 11:30, Sebastian
>> >                 <ssc.open@googlemail.com
>> >                 <ma...@googlemail.com>> wrote:
>> >
>> >                 Hi,
>> >
>> >                 I get a strange out of memory error from the
>> >                 serialization code when I try to run the following
>> program:
>> >
>> >                 def compute(trackingGraphFile: String, domainIndexFile:
>> >                 String,
>> >                    outputPath: String) = {
>> >
>> >                 implicit val env =
>> >                 ExecutionEnvironment.__getExecutionEnvironment
>> >
>> >                 val edges = GraphUtils.readEdges(__trackingGraphFile)
>> >                 val domains = GraphUtils.readVertices(__domainIndexFile)
>> >
>> >                 val domainsByCompany = DomainsByCompany.mapping
>> >                 val companyEdges = edges.filter { edge =>
>> >                      domainsByCompany.contains(__edge.src.toInt) }
>> >                    .map { edge => domainsByCompany(edge.src.__toInt) ->
>> >                 edge.target.toInt }
>> >                    .distinct
>> >
>> >                 val companyBitMaps =
>> companyEdges.groupBy(0).__reduceGroup {
>> >                      domainsByCompany: Iterator[(String,Int)] =>
>> >
>> >                      var company = ""
>> >                      val seenAt = new util.BitSet(42889800)
>> >
>> >                      for ((name, domain) <- domainsByCompany) {
>> >                        company = name
>> >                        seenAt.set(domain)
>> >                      }
>> >
>> >                      company -> seenAt
>> >                    }
>> >
>> >                    companyBitMaps.print()
>> >
>> >                    env.execute()
>> >
>> >                 }
>> >
>> >
>> >                 The error looks as follows:
>> >
>> >
>> >                 2015-02-20 11:22:54 INFO  JobClient:345 -
>> >                 java.lang.OutOfMemoryError: Java heap space
>> >                          at org.apache.flink.runtime.io
>> >                 <http://org.apache.flink.runtime.io
>> >.__network.serialization.__DataOutputSerializer.resize(__DataOutputSerializer.java:249)
>> >                          at org.apache.flink.runtime.io
>> >                 <http://org.apache.flink.runtime.io
>> >.__network.serialization.__DataOutputSerializer.write(__DataOutputSerializer.java:93)
>> >                          at
>> >
>>  org.apache.flink.api.java.__typeutils.runtime.__DataOutputViewStream.write(__DataOutputViewStream.java:39)
>> >                          at com.esotericsoftware.kryo.io
>> >                 <http://com.esotericsoftware.kryo.io
>> >.__Output.flush(Output.java:163)
>> >                          at com.esotericsoftware.kryo.io
>> >                 <http://com.esotericsoftware.kryo.io
>> >.__Output.require(Output.java:__142)
>> >                          at com.esotericsoftware.kryo.io
>> >                 <http://com.esotericsoftware.kryo.io
>> >.__Output.writeBoolean(Output.__java:613)
>> >                          at
>> >
>>  com.twitter.chill.java.__BitSetSerializer.write(__BitSetSerializer.java:42)
>> >                          at
>> >
>>  com.twitter.chill.java.__BitSetSerializer.write(__BitSetSerializer.java:29)
>> >                          at
>> >
>>  com.esotericsoftware.kryo.__Kryo.writeClassAndObject(Kryo.__java:599)
>> >                          at
>> >
>>  org.apache.flink.api.java.__typeutils.runtime.__KryoSerializer.serialize(__KryoSerializer.java:155)
>> >                          at
>> >
>>  org.apache.flink.api.scala.__typeutils.CaseClassSerializer.__serialize(CaseClassSerializer.__scala:91)
>> >                          at
>> >
>>  org.apache.flink.api.scala.__typeutils.CaseClassSerializer.__serialize(CaseClassSerializer.__scala:30)
>> >                          at
>> >
>>  org.apache.flink.runtime.__plugable.__SerializationDelegate.write(__SerializationDelegate.java:51)
>> >                          at org.apache.flink.runtime.io
>> >                 <http://org.apache.flink.runtime.io
>> >.__network.serialization.__SpanningRecordSerializer.__addRecord(__SpanningRecordSerializer.java:__76)
>> >                          at org.apache.flink.runtime.io
>> >                 <http://org.apache.flink.runtime.io
>> >.__network.api.RecordWriter.emit(__RecordWriter.java:82)
>> >                          at
>> >
>>  org.apache.flink.runtime.__operators.shipping.__OutputCollector.collect(__OutputCollector.java:88)
>> >                          at
>> >
>>  org.apache.flink.api.scala.__GroupedDataSet$$anon$2.reduce(__GroupedDataSet.scala:262)
>> >                          at
>> >
>>  org.apache.flink.runtime.__operators.GroupReduceDriver.__run(GroupReduceDriver.java:__124)
>> >                          at
>> >
>>  org.apache.flink.runtime.__operators.RegularPactTask.run(__RegularPactTask.java:493)
>> >                          at
>> >
>>  org.apache.flink.runtime.__operators.RegularPactTask.__invoke(RegularPactTask.java:__360)
>> >                          at
>> >
>>  org.apache.flink.runtime.__execution.RuntimeEnvironment.__run(RuntimeEnvironment.java:__257)
>> >                          at java.lang.Thread.run(Thread.__java:745)
>> >
>> >                 I run the job locally, giving 2GB of Ram to the VM. The
>> >                 code will produce less than 10 groups and the bitsets
>> >                 used internally should not be larger than a few
>> megabytes.
>> >
>> >                 Any tips on how to fix this?
>> >
>> >                 Best,
>> >                 Sebastian
>> >
>> >                 PS: Still waiting for a reduceGroup that gives me the
>> key ;)
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>>
>>
>

Re: OutOfMemory during serialization

Posted by Robert Metzger <rm...@apache.org>.
Lets create an issue in Flink to somehow fix the issue.

Lets a) see if the new serializer registration in 0.9 allows users to
replace the serializers if they had been already set by chill.
and b) fix the issue in twitter/chill.
I think we can ask them to release a new version with the fix (they seem to
release quite often). Also, I made good experiences with contributing to
twitter/chill.

On Fri, Feb 20, 2015 at 2:02 PM, Ufuk Celebi <uc...@apache.org> wrote:

> I've just looked into the BitSetSerializer of Chill. And it seems to be
> true that each bit is encoded as a boolean (for all bit positions <=
> "logical" length).
>
> Regarding the DataOutputSerializer: would help to catch OoM exceptions
> during resize operations and rethrow it with a more detailed message (how
> large the buffer is currently, new size after resize).
>
> On 20 Feb 2015, at 13:22, Stephan Ewen <se...@apache.org> wrote:
>
> > What happens (in the original stack trace) is the following: The
> serializer starts producing the byte stream data and we buffer it, to
> determine the length, before sending it over the network. While buffering
> that data, the memory runs out.
> >
> > It may be that you are simply short of memory, it may also be that the
> serializer (here the Kryo Chill BitsetSerializer) is simply extremely
> inefficient in terms of space. It seems that it tries to write a boolean
> (coded as one byte) per bit. That is blowing up your bitset quite a bit.
> >
> > A solution may also be to register a better bitset serializer. Chill's
> default one seems to be sort of inefficient...
> >
> >
> >
> >
> >
> > On Fri, Feb 20, 2015 at 1:03 PM, Sebastian <ss...@googlemail.com>
> wrote:
> > I don't have a build unfortunately, I'm using the maven dependency. I'll
> try to find a workaround. Thx for your help.
> >
> > -s
> >
> > On 20.02.2015 12:44, Robert Metzger wrote:
> > Hey Sebastian,
> >
> > I've fixed the issue in this branch:
> > https://github.com/rmetzger/flink/tree/flink1589:
> >
> > Configuration c =newConfiguration();
> > c.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,0.5f);
> > finalExecutionEnvironment env =
> ExecutionEnvironment.createLocalEnvironment(c);
> >
> >
> > I'll also backport the fix to the release-0.8 branch to make it
> > available in the 0.8.2 release.
> >
> > Maybe you can easily cherry-pick the commit to your 0.8.1 Flink build.
> >
> >
> > Best,
> > Robert
> >
> > On Fri, Feb 20, 2015 at 12:11 PM, Robert Metzger <rmetzger@apache.org
> > <ma...@apache.org>> wrote:
> >
> >     Hi Sebastian,
> >
> >     Looks like you've found a limitation of Flink.
> >     I've already filed two JIRAs to resolve the issue
> >     (https://issues.apache.org/jira/browse/FLINK-1588,
> >     https://issues.apache.org/jira/browse/FLINK-1589).
> >
> >     I don't know your setup, when you use Flink just as a dependency
> >     without a version being checked out, there is probably no way right
> >     now to use change the configuration settings.
> >     Then, you have to start yourself a local cluster
> >     (./bin/start-local.sh (+ your settings in conf/flink-conf.yaml)).
> >     You can then either submit your job with ./bin/flink or using the
> >     RemoteExecutionEnvironment
> (ExecutionEnvironment.createRemoteEnvironment()).
> >
> >     If you have the Flink source checked out, you can also hard-code the
> >     configuration values into org.apache.flink.client.LocalExecutor.
> >
> >
> >     By the way, Flink 0.8.1 is now available on maven central (I suspect
> >     you had to build it yourself yesterday evening).
> >     But given these issues here, it doesn't matter for you anymore ;)
> >
> >
> >     Best,
> >     Robert
> >
> >
> >
> >     On Fri, Feb 20, 2015 at 11:48 AM, Sebastian <ssc.open@googlemail.com
> >     <ma...@googlemail.com>> wrote:
> >
> >         I'm running flink from my IDE, how do change this setting in
> >         that context?
> >
> >
> >         On 20.02.2015 11:41, Fabian Hueske wrote:
> >
> >             Have you tried to increase the heap size by shrinking the
> >             TM-managed memory?
> >
> >             Reduce the fraction (taskmanager.memory.fraction) or fix the
> >             amount of TM memory (taskmanager.memory.size) in the
> >             flink-config.yaml [1].
> >
> >             Cheers, Fabian
> >
> >             [1] http://flink.apache.org/docs/__0.8/config.html
> >             <http://flink.apache.org/docs/0.8/config.html>
> >
> >
> >                 On 20 Feb 2015, at 11:30, Sebastian
> >                 <ssc.open@googlemail.com
> >                 <ma...@googlemail.com>> wrote:
> >
> >                 Hi,
> >
> >                 I get a strange out of memory error from the
> >                 serialization code when I try to run the following
> program:
> >
> >                 def compute(trackingGraphFile: String, domainIndexFile:
> >                 String,
> >                    outputPath: String) = {
> >
> >                 implicit val env =
> >                 ExecutionEnvironment.__getExecutionEnvironment
> >
> >                 val edges = GraphUtils.readEdges(__trackingGraphFile)
> >                 val domains = GraphUtils.readVertices(__domainIndexFile)
> >
> >                 val domainsByCompany = DomainsByCompany.mapping
> >                 val companyEdges = edges.filter { edge =>
> >                      domainsByCompany.contains(__edge.src.toInt) }
> >                    .map { edge => domainsByCompany(edge.src.__toInt) ->
> >                 edge.target.toInt }
> >                    .distinct
> >
> >                 val companyBitMaps =
> companyEdges.groupBy(0).__reduceGroup {
> >                      domainsByCompany: Iterator[(String,Int)] =>
> >
> >                      var company = ""
> >                      val seenAt = new util.BitSet(42889800)
> >
> >                      for ((name, domain) <- domainsByCompany) {
> >                        company = name
> >                        seenAt.set(domain)
> >                      }
> >
> >                      company -> seenAt
> >                    }
> >
> >                    companyBitMaps.print()
> >
> >                    env.execute()
> >
> >                 }
> >
> >
> >                 The error looks as follows:
> >
> >
> >                 2015-02-20 11:22:54 INFO  JobClient:345 -
> >                 java.lang.OutOfMemoryError: Java heap space
> >                          at org.apache.flink.runtime.io
> >                 <http://org.apache.flink.runtime.io
> >.__network.serialization.__DataOutputSerializer.resize(__DataOutputSerializer.java:249)
> >                          at org.apache.flink.runtime.io
> >                 <http://org.apache.flink.runtime.io
> >.__network.serialization.__DataOutputSerializer.write(__DataOutputSerializer.java:93)
> >                          at
> >
>  org.apache.flink.api.java.__typeutils.runtime.__DataOutputViewStream.write(__DataOutputViewStream.java:39)
> >                          at com.esotericsoftware.kryo.io
> >                 <http://com.esotericsoftware.kryo.io
> >.__Output.flush(Output.java:163)
> >                          at com.esotericsoftware.kryo.io
> >                 <http://com.esotericsoftware.kryo.io
> >.__Output.require(Output.java:__142)
> >                          at com.esotericsoftware.kryo.io
> >                 <http://com.esotericsoftware.kryo.io
> >.__Output.writeBoolean(Output.__java:613)
> >                          at
> >
>  com.twitter.chill.java.__BitSetSerializer.write(__BitSetSerializer.java:42)
> >                          at
> >
>  com.twitter.chill.java.__BitSetSerializer.write(__BitSetSerializer.java:29)
> >                          at
> >
>  com.esotericsoftware.kryo.__Kryo.writeClassAndObject(Kryo.__java:599)
> >                          at
> >
>  org.apache.flink.api.java.__typeutils.runtime.__KryoSerializer.serialize(__KryoSerializer.java:155)
> >                          at
> >
>  org.apache.flink.api.scala.__typeutils.CaseClassSerializer.__serialize(CaseClassSerializer.__scala:91)
> >                          at
> >
>  org.apache.flink.api.scala.__typeutils.CaseClassSerializer.__serialize(CaseClassSerializer.__scala:30)
> >                          at
> >
>  org.apache.flink.runtime.__plugable.__SerializationDelegate.write(__SerializationDelegate.java:51)
> >                          at org.apache.flink.runtime.io
> >                 <http://org.apache.flink.runtime.io
> >.__network.serialization.__SpanningRecordSerializer.__addRecord(__SpanningRecordSerializer.java:__76)
> >                          at org.apache.flink.runtime.io
> >                 <http://org.apache.flink.runtime.io
> >.__network.api.RecordWriter.emit(__RecordWriter.java:82)
> >                          at
> >
>  org.apache.flink.runtime.__operators.shipping.__OutputCollector.collect(__OutputCollector.java:88)
> >                          at
> >
>  org.apache.flink.api.scala.__GroupedDataSet$$anon$2.reduce(__GroupedDataSet.scala:262)
> >                          at
> >
>  org.apache.flink.runtime.__operators.GroupReduceDriver.__run(GroupReduceDriver.java:__124)
> >                          at
> >
>  org.apache.flink.runtime.__operators.RegularPactTask.run(__RegularPactTask.java:493)
> >                          at
> >
>  org.apache.flink.runtime.__operators.RegularPactTask.__invoke(RegularPactTask.java:__360)
> >                          at
> >
>  org.apache.flink.runtime.__execution.RuntimeEnvironment.__run(RuntimeEnvironment.java:__257)
> >                          at java.lang.Thread.run(Thread.__java:745)
> >
> >                 I run the job locally, giving 2GB of Ram to the VM. The
> >                 code will produce less than 10 groups and the bitsets
> >                 used internally should not be larger than a few
> megabytes.
> >
> >                 Any tips on how to fix this?
> >
> >                 Best,
> >                 Sebastian
> >
> >                 PS: Still waiting for a reduceGroup that gives me the
> key ;)
> >
> >
> >
> >
> >
> >
> >
>
>

Re: OutOfMemory during serialization

Posted by Ufuk Celebi <uc...@apache.org>.
I've just looked into the BitSetSerializer of Chill. And it seems to be true that each bit is encoded as a boolean (for all bit positions <= "logical" length).

Regarding the DataOutputSerializer: would help to catch OoM exceptions during resize operations and rethrow it with a more detailed message (how large the buffer is currently, new size after resize).

On 20 Feb 2015, at 13:22, Stephan Ewen <se...@apache.org> wrote:

> What happens (in the original stack trace) is the following: The serializer starts producing the byte stream data and we buffer it, to determine the length, before sending it over the network. While buffering that data, the memory runs out.
> 
> It may be that you are simply short of memory, it may also be that the serializer (here the Kryo Chill BitsetSerializer) is simply extremely inefficient in terms of space. It seems that it tries to write a boolean (coded as one byte) per bit. That is blowing up your bitset quite a bit.
> 
> A solution may also be to register a better bitset serializer. Chill's default one seems to be sort of inefficient...
> 
> 
> 
> 
> 
> On Fri, Feb 20, 2015 at 1:03 PM, Sebastian <ss...@googlemail.com> wrote:
> I don't have a build unfortunately, I'm using the maven dependency. I'll try to find a workaround. Thx for your help.
> 
> -s
> 
> On 20.02.2015 12:44, Robert Metzger wrote:
> Hey Sebastian,
> 
> I've fixed the issue in this branch:
> https://github.com/rmetzger/flink/tree/flink1589:
> 
> Configuration c =newConfiguration();
> c.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,0.5f);
> finalExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(c);
> 
> 
> I'll also backport the fix to the release-0.8 branch to make it
> available in the 0.8.2 release.
> 
> Maybe you can easily cherry-pick the commit to your 0.8.1 Flink build.
> 
> 
> Best,
> Robert
> 
> On Fri, Feb 20, 2015 at 12:11 PM, Robert Metzger <rmetzger@apache.org
> <ma...@apache.org>> wrote:
> 
>     Hi Sebastian,
> 
>     Looks like you've found a limitation of Flink.
>     I've already filed two JIRAs to resolve the issue
>     (https://issues.apache.org/jira/browse/FLINK-1588,
>     https://issues.apache.org/jira/browse/FLINK-1589).
> 
>     I don't know your setup, when you use Flink just as a dependency
>     without a version being checked out, there is probably no way right
>     now to use change the configuration settings.
>     Then, you have to start yourself a local cluster
>     (./bin/start-local.sh (+ your settings in conf/flink-conf.yaml)).
>     You can then either submit your job with ./bin/flink or using the
>     RemoteExecutionEnvironment (ExecutionEnvironment.createRemoteEnvironment()).
> 
>     If you have the Flink source checked out, you can also hard-code the
>     configuration values into org.apache.flink.client.LocalExecutor.
> 
> 
>     By the way, Flink 0.8.1 is now available on maven central (I suspect
>     you had to build it yourself yesterday evening).
>     But given these issues here, it doesn't matter for you anymore ;)
> 
> 
>     Best,
>     Robert
> 
> 
> 
>     On Fri, Feb 20, 2015 at 11:48 AM, Sebastian <ssc.open@googlemail.com
>     <ma...@googlemail.com>> wrote:
> 
>         I'm running flink from my IDE, how do change this setting in
>         that context?
> 
> 
>         On 20.02.2015 11:41, Fabian Hueske wrote:
> 
>             Have you tried to increase the heap size by shrinking the
>             TM-managed memory?
> 
>             Reduce the fraction (taskmanager.memory.fraction) or fix the
>             amount of TM memory (taskmanager.memory.size) in the
>             flink-config.yaml [1].
> 
>             Cheers, Fabian
> 
>             [1] http://flink.apache.org/docs/__0.8/config.html
>             <http://flink.apache.org/docs/0.8/config.html>
> 
> 
>                 On 20 Feb 2015, at 11:30, Sebastian
>                 <ssc.open@googlemail.com
>                 <ma...@googlemail.com>> wrote:
> 
>                 Hi,
> 
>                 I get a strange out of memory error from the
>                 serialization code when I try to run the following program:
> 
>                 def compute(trackingGraphFile: String, domainIndexFile:
>                 String,
>                    outputPath: String) = {
> 
>                 implicit val env =
>                 ExecutionEnvironment.__getExecutionEnvironment
> 
>                 val edges = GraphUtils.readEdges(__trackingGraphFile)
>                 val domains = GraphUtils.readVertices(__domainIndexFile)
> 
>                 val domainsByCompany = DomainsByCompany.mapping
>                 val companyEdges = edges.filter { edge =>
>                      domainsByCompany.contains(__edge.src.toInt) }
>                    .map { edge => domainsByCompany(edge.src.__toInt) ->
>                 edge.target.toInt }
>                    .distinct
> 
>                 val companyBitMaps = companyEdges.groupBy(0).__reduceGroup {
>                      domainsByCompany: Iterator[(String,Int)] =>
> 
>                      var company = ""
>                      val seenAt = new util.BitSet(42889800)
> 
>                      for ((name, domain) <- domainsByCompany) {
>                        company = name
>                        seenAt.set(domain)
>                      }
> 
>                      company -> seenAt
>                    }
> 
>                    companyBitMaps.print()
> 
>                    env.execute()
> 
>                 }
> 
> 
>                 The error looks as follows:
> 
> 
>                 2015-02-20 11:22:54 INFO  JobClient:345 -
>                 java.lang.OutOfMemoryError: Java heap space
>                          at org.apache.flink.runtime.io
>                 <http://org.apache.flink.runtime.io>.__network.serialization.__DataOutputSerializer.resize(__DataOutputSerializer.java:249)
>                          at org.apache.flink.runtime.io
>                 <http://org.apache.flink.runtime.io>.__network.serialization.__DataOutputSerializer.write(__DataOutputSerializer.java:93)
>                          at
>                 org.apache.flink.api.java.__typeutils.runtime.__DataOutputViewStream.write(__DataOutputViewStream.java:39)
>                          at com.esotericsoftware.kryo.io
>                 <http://com.esotericsoftware.kryo.io>.__Output.flush(Output.java:163)
>                          at com.esotericsoftware.kryo.io
>                 <http://com.esotericsoftware.kryo.io>.__Output.require(Output.java:__142)
>                          at com.esotericsoftware.kryo.io
>                 <http://com.esotericsoftware.kryo.io>.__Output.writeBoolean(Output.__java:613)
>                          at
>                 com.twitter.chill.java.__BitSetSerializer.write(__BitSetSerializer.java:42)
>                          at
>                 com.twitter.chill.java.__BitSetSerializer.write(__BitSetSerializer.java:29)
>                          at
>                 com.esotericsoftware.kryo.__Kryo.writeClassAndObject(Kryo.__java:599)
>                          at
>                 org.apache.flink.api.java.__typeutils.runtime.__KryoSerializer.serialize(__KryoSerializer.java:155)
>                          at
>                 org.apache.flink.api.scala.__typeutils.CaseClassSerializer.__serialize(CaseClassSerializer.__scala:91)
>                          at
>                 org.apache.flink.api.scala.__typeutils.CaseClassSerializer.__serialize(CaseClassSerializer.__scala:30)
>                          at
>                 org.apache.flink.runtime.__plugable.__SerializationDelegate.write(__SerializationDelegate.java:51)
>                          at org.apache.flink.runtime.io
>                 <http://org.apache.flink.runtime.io>.__network.serialization.__SpanningRecordSerializer.__addRecord(__SpanningRecordSerializer.java:__76)
>                          at org.apache.flink.runtime.io
>                 <http://org.apache.flink.runtime.io>.__network.api.RecordWriter.emit(__RecordWriter.java:82)
>                          at
>                 org.apache.flink.runtime.__operators.shipping.__OutputCollector.collect(__OutputCollector.java:88)
>                          at
>                 org.apache.flink.api.scala.__GroupedDataSet$$anon$2.reduce(__GroupedDataSet.scala:262)
>                          at
>                 org.apache.flink.runtime.__operators.GroupReduceDriver.__run(GroupReduceDriver.java:__124)
>                          at
>                 org.apache.flink.runtime.__operators.RegularPactTask.run(__RegularPactTask.java:493)
>                          at
>                 org.apache.flink.runtime.__operators.RegularPactTask.__invoke(RegularPactTask.java:__360)
>                          at
>                 org.apache.flink.runtime.__execution.RuntimeEnvironment.__run(RuntimeEnvironment.java:__257)
>                          at java.lang.Thread.run(Thread.__java:745)
> 
>                 I run the job locally, giving 2GB of Ram to the VM. The
>                 code will produce less than 10 groups and the bitsets
>                 used internally should not be larger than a few megabytes.
> 
>                 Any tips on how to fix this?
> 
>                 Best,
>                 Sebastian
> 
>                 PS: Still waiting for a reduceGroup that gives me the key ;)
> 
> 
> 
> 
> 
> 
> 


Re: OutOfMemory during serialization

Posted by Stephan Ewen <se...@apache.org>.
What happens (in the original stack trace) is the following: The serializer
starts producing the byte stream data and we buffer it, to determine the
length, before sending it over the network. While buffering that data, the
memory runs out.

It may be that you are simply short of memory, it may also be that the
serializer (here the Kryo Chill BitsetSerializer) is simply extremely
inefficient in terms of space. It seems that it tries to write a boolean
(coded as one byte) per bit. That is blowing up your bitset quite a bit.

A solution may also be to register a better bitset serializer. Chill's
default one seems to be sort of inefficient...





On Fri, Feb 20, 2015 at 1:03 PM, Sebastian <ss...@googlemail.com> wrote:

> I don't have a build unfortunately, I'm using the maven dependency. I'll
> try to find a workaround. Thx for your help.
>
> -s
>
> On 20.02.2015 12:44, Robert Metzger wrote:
>
>> Hey Sebastian,
>>
>> I've fixed the issue in this branch:
>> https://github.com/rmetzger/flink/tree/flink1589:
>>
>> Configuration c =newConfiguration();
>> c.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,0.5f);
>> finalExecutionEnvironment env = ExecutionEnvironment.
>> createLocalEnvironment(c);
>>
>>
>> I'll also backport the fix to the release-0.8 branch to make it
>> available in the 0.8.2 release.
>>
>> Maybe you can easily cherry-pick the commit to your 0.8.1 Flink build.
>>
>>
>> Best,
>> Robert
>>
>> On Fri, Feb 20, 2015 at 12:11 PM, Robert Metzger <rmetzger@apache.org
>> <ma...@apache.org>> wrote:
>>
>>     Hi Sebastian,
>>
>>     Looks like you've found a limitation of Flink.
>>     I've already filed two JIRAs to resolve the issue
>>     (https://issues.apache.org/jira/browse/FLINK-1588,
>>     https://issues.apache.org/jira/browse/FLINK-1589).
>>
>>     I don't know your setup, when you use Flink just as a dependency
>>     without a version being checked out, there is probably no way right
>>     now to use change the configuration settings.
>>     Then, you have to start yourself a local cluster
>>     (./bin/start-local.sh (+ your settings in conf/flink-conf.yaml)).
>>     You can then either submit your job with ./bin/flink or using the
>>     RemoteExecutionEnvironment (ExecutionEnvironment.
>> createRemoteEnvironment()).
>>
>>     If you have the Flink source checked out, you can also hard-code the
>>     configuration values into org.apache.flink.client.LocalExecutor.
>>
>>
>>     By the way, Flink 0.8.1 is now available on maven central (I suspect
>>     you had to build it yourself yesterday evening).
>>     But given these issues here, it doesn't matter for you anymore ;)
>>
>>
>>     Best,
>>     Robert
>>
>>
>>
>>     On Fri, Feb 20, 2015 at 11:48 AM, Sebastian <ssc.open@googlemail.com
>>     <ma...@googlemail.com>> wrote:
>>
>>         I'm running flink from my IDE, how do change this setting in
>>         that context?
>>
>>
>>         On 20.02.2015 11:41, Fabian Hueske wrote:
>>
>>             Have you tried to increase the heap size by shrinking the
>>             TM-managed memory?
>>
>>             Reduce the fraction (taskmanager.memory.fraction) or fix the
>>             amount of TM memory (taskmanager.memory.size) in the
>>             flink-config.yaml [1].
>>
>>             Cheers, Fabian
>>
>>             [1] http://flink.apache.org/docs/__0.8/config.html
>>             <http://flink.apache.org/docs/0.8/config.html>
>>
>>
>>                 On 20 Feb 2015, at 11:30, Sebastian
>>                 <ssc.open@googlemail.com
>>                 <ma...@googlemail.com>> wrote:
>>
>>                 Hi,
>>
>>                 I get a strange out of memory error from the
>>                 serialization code when I try to run the following
>> program:
>>
>>                 def compute(trackingGraphFile: String, domainIndexFile:
>>                 String,
>>                    outputPath: String) = {
>>
>>                 implicit val env =
>>                 ExecutionEnvironment.__getExecutionEnvironment
>>
>>                 val edges = GraphUtils.readEdges(__trackingGraphFile)
>>                 val domains = GraphUtils.readVertices(__domainIndexFile)
>>
>>                 val domainsByCompany = DomainsByCompany.mapping
>>                 val companyEdges = edges.filter { edge =>
>>                      domainsByCompany.contains(__edge.src.toInt) }
>>                    .map { edge => domainsByCompany(edge.src.__toInt) ->
>>                 edge.target.toInt }
>>                    .distinct
>>
>>                 val companyBitMaps = companyEdges.groupBy(0).__reduceGroup
>> {
>>                      domainsByCompany: Iterator[(String,Int)] =>
>>
>>                      var company = ""
>>                      val seenAt = new util.BitSet(42889800)
>>
>>                      for ((name, domain) <- domainsByCompany) {
>>                        company = name
>>                        seenAt.set(domain)
>>                      }
>>
>>                      company -> seenAt
>>                    }
>>
>>                    companyBitMaps.print()
>>
>>                    env.execute()
>>
>>                 }
>>
>>
>>                 The error looks as follows:
>>
>>
>>                 2015-02-20 11:22:54 INFO  JobClient:345 -
>>                 java.lang.OutOfMemoryError: Java heap space
>>                          at org.apache.flink.runtime.io
>>                 <http://org.apache.flink.runtime.io>.__network.
>> serialization.__DataOutputSerializer.resize(__
>> DataOutputSerializer.java:249)
>>                          at org.apache.flink.runtime.io
>>                 <http://org.apache.flink.runtime.io>.__network.
>> serialization.__DataOutputSerializer.write(__
>> DataOutputSerializer.java:93)
>>                          at
>>                 org.apache.flink.api.java.__typeutils.runtime.__
>> DataOutputViewStream.write(__DataOutputViewStream.java:39)
>>                          at com.esotericsoftware.kryo.io
>>                 <http://com.esotericsoftware.kryo.io>.__Output.flush(
>> Output.java:163)
>>                          at com.esotericsoftware.kryo.io
>>                 <http://com.esotericsoftware.kryo.io>.__Output.require(
>> Output.java:__142)
>>                          at com.esotericsoftware.kryo.io
>>                 <http://com.esotericsoftware.kryo.io>.__Output.
>> writeBoolean(Output.__java:613)
>>                          at
>>                 com.twitter.chill.java.__BitSetSerializer.write(__
>> BitSetSerializer.java:42)
>>                          at
>>                 com.twitter.chill.java.__BitSetSerializer.write(__
>> BitSetSerializer.java:29)
>>                          at
>>                 com.esotericsoftware.kryo.__
>> Kryo.writeClassAndObject(Kryo.__java:599)
>>                          at
>>                 org.apache.flink.api.java.__typeutils.runtime.__
>> KryoSerializer.serialize(__KryoSerializer.java:155)
>>                          at
>>                 org.apache.flink.api.scala.__
>> typeutils.CaseClassSerializer.__serialize(CaseClassSerializer.__scala:91)
>>                          at
>>                 org.apache.flink.api.scala.__
>> typeutils.CaseClassSerializer.__serialize(CaseClassSerializer.__scala:30)
>>                          at
>>                 org.apache.flink.runtime.__plugable.__
>> SerializationDelegate.write(__SerializationDelegate.java:51)
>>                          at org.apache.flink.runtime.io
>>                 <http://org.apache.flink.runtime.io>.__network.
>> serialization.__SpanningRecordSerializer.__addRecord(__
>> SpanningRecordSerializer.java:__76)
>>                          at org.apache.flink.runtime.io
>>                 <http://org.apache.flink.runtime.io>.__network.api.
>> RecordWriter.emit(__RecordWriter.java:82)
>>                          at
>>                 org.apache.flink.runtime.__operators.shipping.__
>> OutputCollector.collect(__OutputCollector.java:88)
>>                          at
>>                 org.apache.flink.api.scala.__
>> GroupedDataSet$$anon$2.reduce(__GroupedDataSet.scala:262)
>>                          at
>>                 org.apache.flink.runtime.__operators.GroupReduceDriver.__
>> run(GroupReduceDriver.java:__124)
>>                          at
>>                 org.apache.flink.runtime.__operators.RegularPactTask.run(
>> __RegularPactTask.java:493)
>>                          at
>>                 org.apache.flink.runtime.__operators.RegularPactTask.__
>> invoke(RegularPactTask.java:__360)
>>                          at
>>                 org.apache.flink.runtime.__execution.RuntimeEnvironment._
>> _run(RuntimeEnvironment.java:__257)
>>                          at java.lang.Thread.run(Thread.__java:745)
>>
>>                 I run the job locally, giving 2GB of Ram to the VM. The
>>                 code will produce less than 10 groups and the bitsets
>>                 used internally should not be larger than a few megabytes.
>>
>>                 Any tips on how to fix this?
>>
>>                 Best,
>>                 Sebastian
>>
>>                 PS: Still waiting for a reduceGroup that gives me the key
>> ;)
>>
>>
>>
>>
>>
>>
>>

Re: OutOfMemory during serialization

Posted by Sebastian <ss...@googlemail.com>.
I don't have a build unfortunately, I'm using the maven dependency. I'll 
try to find a workaround. Thx for your help.

-s

On 20.02.2015 12:44, Robert Metzger wrote:
> Hey Sebastian,
>
> I've fixed the issue in this branch:
> https://github.com/rmetzger/flink/tree/flink1589:
>
> Configuration c =newConfiguration();
> c.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,0.5f);
> finalExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(c);
>
>
> I'll also backport the fix to the release-0.8 branch to make it
> available in the 0.8.2 release.
>
> Maybe you can easily cherry-pick the commit to your 0.8.1 Flink build.
>
>
> Best,
> Robert
>
> On Fri, Feb 20, 2015 at 12:11 PM, Robert Metzger <rmetzger@apache.org
> <ma...@apache.org>> wrote:
>
>     Hi Sebastian,
>
>     Looks like you've found a limitation of Flink.
>     I've already filed two JIRAs to resolve the issue
>     (https://issues.apache.org/jira/browse/FLINK-1588,
>     https://issues.apache.org/jira/browse/FLINK-1589).
>
>     I don't know your setup, when you use Flink just as a dependency
>     without a version being checked out, there is probably no way right
>     now to use change the configuration settings.
>     Then, you have to start yourself a local cluster
>     (./bin/start-local.sh (+ your settings in conf/flink-conf.yaml)).
>     You can then either submit your job with ./bin/flink or using the
>     RemoteExecutionEnvironment (ExecutionEnvironment.createRemoteEnvironment()).
>
>     If you have the Flink source checked out, you can also hard-code the
>     configuration values into org.apache.flink.client.LocalExecutor.
>
>
>     By the way, Flink 0.8.1 is now available on maven central (I suspect
>     you had to build it yourself yesterday evening).
>     But given these issues here, it doesn't matter for you anymore ;)
>
>
>     Best,
>     Robert
>
>
>
>     On Fri, Feb 20, 2015 at 11:48 AM, Sebastian <ssc.open@googlemail.com
>     <ma...@googlemail.com>> wrote:
>
>         I'm running flink from my IDE, how do change this setting in
>         that context?
>
>
>         On 20.02.2015 11:41, Fabian Hueske wrote:
>
>             Have you tried to increase the heap size by shrinking the
>             TM-managed memory?
>
>             Reduce the fraction (taskmanager.memory.fraction) or fix the
>             amount of TM memory (taskmanager.memory.size) in the
>             flink-config.yaml [1].
>
>             Cheers, Fabian
>
>             [1] http://flink.apache.org/docs/__0.8/config.html
>             <http://flink.apache.org/docs/0.8/config.html>
>
>
>                 On 20 Feb 2015, at 11:30, Sebastian
>                 <ssc.open@googlemail.com
>                 <ma...@googlemail.com>> wrote:
>
>                 Hi,
>
>                 I get a strange out of memory error from the
>                 serialization code when I try to run the following program:
>
>                 def compute(trackingGraphFile: String, domainIndexFile:
>                 String,
>                    outputPath: String) = {
>
>                 implicit val env =
>                 ExecutionEnvironment.__getExecutionEnvironment
>
>                 val edges = GraphUtils.readEdges(__trackingGraphFile)
>                 val domains = GraphUtils.readVertices(__domainIndexFile)
>
>                 val domainsByCompany = DomainsByCompany.mapping
>                 val companyEdges = edges.filter { edge =>
>                      domainsByCompany.contains(__edge.src.toInt) }
>                    .map { edge => domainsByCompany(edge.src.__toInt) ->
>                 edge.target.toInt }
>                    .distinct
>
>                 val companyBitMaps = companyEdges.groupBy(0).__reduceGroup {
>                      domainsByCompany: Iterator[(String,Int)] =>
>
>                      var company = ""
>                      val seenAt = new util.BitSet(42889800)
>
>                      for ((name, domain) <- domainsByCompany) {
>                        company = name
>                        seenAt.set(domain)
>                      }
>
>                      company -> seenAt
>                    }
>
>                    companyBitMaps.print()
>
>                    env.execute()
>
>                 }
>
>
>                 The error looks as follows:
>
>
>                 2015-02-20 11:22:54 INFO  JobClient:345 -
>                 java.lang.OutOfMemoryError: Java heap space
>                          at org.apache.flink.runtime.io
>                 <http://org.apache.flink.runtime.io>.__network.serialization.__DataOutputSerializer.resize(__DataOutputSerializer.java:249)
>                          at org.apache.flink.runtime.io
>                 <http://org.apache.flink.runtime.io>.__network.serialization.__DataOutputSerializer.write(__DataOutputSerializer.java:93)
>                          at
>                 org.apache.flink.api.java.__typeutils.runtime.__DataOutputViewStream.write(__DataOutputViewStream.java:39)
>                          at com.esotericsoftware.kryo.io
>                 <http://com.esotericsoftware.kryo.io>.__Output.flush(Output.java:163)
>                          at com.esotericsoftware.kryo.io
>                 <http://com.esotericsoftware.kryo.io>.__Output.require(Output.java:__142)
>                          at com.esotericsoftware.kryo.io
>                 <http://com.esotericsoftware.kryo.io>.__Output.writeBoolean(Output.__java:613)
>                          at
>                 com.twitter.chill.java.__BitSetSerializer.write(__BitSetSerializer.java:42)
>                          at
>                 com.twitter.chill.java.__BitSetSerializer.write(__BitSetSerializer.java:29)
>                          at
>                 com.esotericsoftware.kryo.__Kryo.writeClassAndObject(Kryo.__java:599)
>                          at
>                 org.apache.flink.api.java.__typeutils.runtime.__KryoSerializer.serialize(__KryoSerializer.java:155)
>                          at
>                 org.apache.flink.api.scala.__typeutils.CaseClassSerializer.__serialize(CaseClassSerializer.__scala:91)
>                          at
>                 org.apache.flink.api.scala.__typeutils.CaseClassSerializer.__serialize(CaseClassSerializer.__scala:30)
>                          at
>                 org.apache.flink.runtime.__plugable.__SerializationDelegate.write(__SerializationDelegate.java:51)
>                          at org.apache.flink.runtime.io
>                 <http://org.apache.flink.runtime.io>.__network.serialization.__SpanningRecordSerializer.__addRecord(__SpanningRecordSerializer.java:__76)
>                          at org.apache.flink.runtime.io
>                 <http://org.apache.flink.runtime.io>.__network.api.RecordWriter.emit(__RecordWriter.java:82)
>                          at
>                 org.apache.flink.runtime.__operators.shipping.__OutputCollector.collect(__OutputCollector.java:88)
>                          at
>                 org.apache.flink.api.scala.__GroupedDataSet$$anon$2.reduce(__GroupedDataSet.scala:262)
>                          at
>                 org.apache.flink.runtime.__operators.GroupReduceDriver.__run(GroupReduceDriver.java:__124)
>                          at
>                 org.apache.flink.runtime.__operators.RegularPactTask.run(__RegularPactTask.java:493)
>                          at
>                 org.apache.flink.runtime.__operators.RegularPactTask.__invoke(RegularPactTask.java:__360)
>                          at
>                 org.apache.flink.runtime.__execution.RuntimeEnvironment.__run(RuntimeEnvironment.java:__257)
>                          at java.lang.Thread.run(Thread.__java:745)
>
>                 I run the job locally, giving 2GB of Ram to the VM. The
>                 code will produce less than 10 groups and the bitsets
>                 used internally should not be larger than a few megabytes.
>
>                 Any tips on how to fix this?
>
>                 Best,
>                 Sebastian
>
>                 PS: Still waiting for a reduceGroup that gives me the key ;)
>
>
>
>
>
>

Re: OutOfMemory during serialization

Posted by Robert Metzger <rm...@apache.org>.
Hey Sebastian,

I've fixed the issue in this branch:
https://github.com/rmetzger/flink/tree/flink1589:

Configuration c = new Configuration();
c.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, 0.5f);
final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(c);


I'll also backport the fix to the release-0.8 branch to make it available
in the 0.8.2 release.

Maybe you can easily cherry-pick the commit to your 0.8.1 Flink build.


Best,
Robert

On Fri, Feb 20, 2015 at 12:11 PM, Robert Metzger <rm...@apache.org>
wrote:

> Hi Sebastian,
>
> Looks like you've found a limitation of Flink.
> I've already filed two JIRAs to resolve the issue (
> https://issues.apache.org/jira/browse/FLINK-1588,
> https://issues.apache.org/jira/browse/FLINK-1589).
>
> I don't know your setup, when you use Flink just as a dependency without a
> version being checked out, there is probably no way right now to use change
> the configuration settings.
> Then, you have to start yourself a local cluster (./bin/start-local.sh (+
> your settings in conf/flink-conf.yaml)). You can then either submit your
> job with ./bin/flink or using the
> RemoteExecutionEnvironment (ExecutionEnvironment.createRemoteEnvironment()).
>
> If you have the Flink source checked out, you can also hard-code the
> configuration values into org.apache.flink.client.LocalExecutor.
>
>
> By the way, Flink 0.8.1 is now available on maven central (I suspect you
> had to build it yourself yesterday evening).
> But given these issues here, it doesn't matter for you anymore ;)
>
>
> Best,
> Robert
>
>
>
> On Fri, Feb 20, 2015 at 11:48 AM, Sebastian <ss...@googlemail.com>
> wrote:
>
>> I'm running flink from my IDE, how do change this setting in that context?
>>
>>
>> On 20.02.2015 11:41, Fabian Hueske wrote:
>>
>>> Have you tried to increase the heap size by shrinking the TM-managed
>>> memory?
>>>
>>> Reduce the fraction (taskmanager.memory.fraction) or fix the amount of
>>> TM memory (taskmanager.memory.size) in the flink-config.yaml [1].
>>>
>>> Cheers, Fabian
>>>
>>> [1] http://flink.apache.org/docs/0.8/config.html
>>>
>>>
>>>  On 20 Feb 2015, at 11:30, Sebastian <ss...@googlemail.com> wrote:
>>>>
>>>> Hi,
>>>>
>>>> I get a strange out of memory error from the serialization code when I
>>>> try to run the following program:
>>>>
>>>> def compute(trackingGraphFile: String, domainIndexFile: String,
>>>>   outputPath: String) = {
>>>>
>>>> implicit val env = ExecutionEnvironment.getExecutionEnvironment
>>>>
>>>> val edges = GraphUtils.readEdges(trackingGraphFile)
>>>> val domains = GraphUtils.readVertices(domainIndexFile)
>>>>
>>>> val domainsByCompany = DomainsByCompany.mapping
>>>> val companyEdges = edges.filter { edge =>
>>>>     domainsByCompany.contains(edge.src.toInt) }
>>>>   .map { edge => domainsByCompany(edge.src.toInt) -> edge.target.toInt
>>>> }
>>>>   .distinct
>>>>
>>>> val companyBitMaps = companyEdges.groupBy(0).reduceGroup {
>>>>     domainsByCompany: Iterator[(String,Int)] =>
>>>>
>>>>     var company = ""
>>>>     val seenAt = new util.BitSet(42889800)
>>>>
>>>>     for ((name, domain) <- domainsByCompany) {
>>>>       company = name
>>>>       seenAt.set(domain)
>>>>     }
>>>>
>>>>     company -> seenAt
>>>>   }
>>>>
>>>>   companyBitMaps.print()
>>>>
>>>>   env.execute()
>>>>
>>>> }
>>>>
>>>>
>>>> The error looks as follows:
>>>>
>>>>
>>>> 2015-02-20 11:22:54 INFO  JobClient:345 - java.lang.OutOfMemoryError:
>>>> Java heap space
>>>>         at org.apache.flink.runtime.io.network.serialization.
>>>> DataOutputSerializer.resize(DataOutputSerializer.java:249)
>>>>         at org.apache.flink.runtime.io.network.serialization.
>>>> DataOutputSerializer.write(DataOutputSerializer.java:93)
>>>>         at org.apache.flink.api.java.typeutils.runtime.
>>>> DataOutputViewStream.write(DataOutputViewStream.java:39)
>>>>         at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)
>>>>         at com.esotericsoftware.kryo.io.Output.require(Output.java:142)
>>>>         at com.esotericsoftware.kryo.io.Output.writeBoolean(Output.
>>>> java:613)
>>>>         at com.twitter.chill.java.BitSetSerializer.write(
>>>> BitSetSerializer.java:42)
>>>>         at com.twitter.chill.java.BitSetSerializer.write(
>>>> BitSetSerializer.java:29)
>>>>         at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.
>>>> java:599)
>>>>         at org.apache.flink.api.java.typeutils.runtime.
>>>> KryoSerializer.serialize(KryoSerializer.java:155)
>>>>         at org.apache.flink.api.scala.typeutils.CaseClassSerializer.
>>>> serialize(CaseClassSerializer.scala:91)
>>>>         at org.apache.flink.api.scala.typeutils.CaseClassSerializer.
>>>> serialize(CaseClassSerializer.scala:30)
>>>>         at org.apache.flink.runtime.plugable.
>>>> SerializationDelegate.write(SerializationDelegate.java:51)
>>>>         at org.apache.flink.runtime.io.network.serialization.
>>>> SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76)
>>>>         at org.apache.flink.runtime.io.network.api.RecordWriter.emit(
>>>> RecordWriter.java:82)
>>>>         at org.apache.flink.runtime.operators.shipping.
>>>> OutputCollector.collect(OutputCollector.java:88)
>>>>         at org.apache.flink.api.scala.GroupedDataSet$$anon$2.reduce(
>>>> GroupedDataSet.scala:262)
>>>>         at org.apache.flink.runtime.operators.GroupReduceDriver.
>>>> run(GroupReduceDriver.java:124)
>>>>         at org.apache.flink.runtime.operators.RegularPactTask.run(
>>>> RegularPactTask.java:493)
>>>>         at org.apache.flink.runtime.operators.RegularPactTask.
>>>> invoke(RegularPactTask.java:360)
>>>>         at org.apache.flink.runtime.execution.RuntimeEnvironment.
>>>> run(RuntimeEnvironment.java:257)
>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>
>>>> I run the job locally, giving 2GB of Ram to the VM. The code will
>>>> produce less than 10 groups and the bitsets used internally should not be
>>>> larger than a few megabytes.
>>>>
>>>> Any tips on how to fix this?
>>>>
>>>> Best,
>>>> Sebastian
>>>>
>>>> PS: Still waiting for a reduceGroup that gives me the key ;)
>>>>
>>>>
>>>>
>>>>
>>>
>

Re: OutOfMemory during serialization

Posted by Robert Metzger <rm...@apache.org>.
Hi Sebastian,

Looks like you've found a limitation of Flink.
I've already filed two JIRAs to resolve the issue (
https://issues.apache.org/jira/browse/FLINK-1588,
https://issues.apache.org/jira/browse/FLINK-1589).

I don't know your setup, when you use Flink just as a dependency without a
version being checked out, there is probably no way right now to use change
the configuration settings.
Then, you have to start yourself a local cluster (./bin/start-local.sh (+
your settings in conf/flink-conf.yaml)). You can then either submit your
job with ./bin/flink or using the
RemoteExecutionEnvironment (ExecutionEnvironment.createRemoteEnvironment()).

If you have the Flink source checked out, you can also hard-code the
configuration values into org.apache.flink.client.LocalExecutor.


By the way, Flink 0.8.1 is now available on maven central (I suspect you
had to build it yourself yesterday evening).
But given these issues here, it doesn't matter for you anymore ;)


Best,
Robert



On Fri, Feb 20, 2015 at 11:48 AM, Sebastian <ss...@googlemail.com> wrote:

> I'm running flink from my IDE, how do change this setting in that context?
>
>
> On 20.02.2015 11:41, Fabian Hueske wrote:
>
>> Have you tried to increase the heap size by shrinking the TM-managed
>> memory?
>>
>> Reduce the fraction (taskmanager.memory.fraction) or fix the amount of TM
>> memory (taskmanager.memory.size) in the flink-config.yaml [1].
>>
>> Cheers, Fabian
>>
>> [1] http://flink.apache.org/docs/0.8/config.html
>>
>>
>>  On 20 Feb 2015, at 11:30, Sebastian <ss...@googlemail.com> wrote:
>>>
>>> Hi,
>>>
>>> I get a strange out of memory error from the serialization code when I
>>> try to run the following program:
>>>
>>> def compute(trackingGraphFile: String, domainIndexFile: String,
>>>   outputPath: String) = {
>>>
>>> implicit val env = ExecutionEnvironment.getExecutionEnvironment
>>>
>>> val edges = GraphUtils.readEdges(trackingGraphFile)
>>> val domains = GraphUtils.readVertices(domainIndexFile)
>>>
>>> val domainsByCompany = DomainsByCompany.mapping
>>> val companyEdges = edges.filter { edge =>
>>>     domainsByCompany.contains(edge.src.toInt) }
>>>   .map { edge => domainsByCompany(edge.src.toInt) -> edge.target.toInt }
>>>   .distinct
>>>
>>> val companyBitMaps = companyEdges.groupBy(0).reduceGroup {
>>>     domainsByCompany: Iterator[(String,Int)] =>
>>>
>>>     var company = ""
>>>     val seenAt = new util.BitSet(42889800)
>>>
>>>     for ((name, domain) <- domainsByCompany) {
>>>       company = name
>>>       seenAt.set(domain)
>>>     }
>>>
>>>     company -> seenAt
>>>   }
>>>
>>>   companyBitMaps.print()
>>>
>>>   env.execute()
>>>
>>> }
>>>
>>>
>>> The error looks as follows:
>>>
>>>
>>> 2015-02-20 11:22:54 INFO  JobClient:345 - java.lang.OutOfMemoryError:
>>> Java heap space
>>>         at org.apache.flink.runtime.io.network.serialization.
>>> DataOutputSerializer.resize(DataOutputSerializer.java:249)
>>>         at org.apache.flink.runtime.io.network.serialization.
>>> DataOutputSerializer.write(DataOutputSerializer.java:93)
>>>         at org.apache.flink.api.java.typeutils.runtime.
>>> DataOutputViewStream.write(DataOutputViewStream.java:39)
>>>         at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)
>>>         at com.esotericsoftware.kryo.io.Output.require(Output.java:142)
>>>         at com.esotericsoftware.kryo.io.Output.writeBoolean(Output.
>>> java:613)
>>>         at com.twitter.chill.java.BitSetSerializer.write(
>>> BitSetSerializer.java:42)
>>>         at com.twitter.chill.java.BitSetSerializer.write(
>>> BitSetSerializer.java:29)
>>>         at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.
>>> java:599)
>>>         at org.apache.flink.api.java.typeutils.runtime.
>>> KryoSerializer.serialize(KryoSerializer.java:155)
>>>         at org.apache.flink.api.scala.typeutils.CaseClassSerializer.
>>> serialize(CaseClassSerializer.scala:91)
>>>         at org.apache.flink.api.scala.typeutils.CaseClassSerializer.
>>> serialize(CaseClassSerializer.scala:30)
>>>         at org.apache.flink.runtime.plugable.
>>> SerializationDelegate.write(SerializationDelegate.java:51)
>>>         at org.apache.flink.runtime.io.network.serialization.
>>> SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76)
>>>         at org.apache.flink.runtime.io.network.api.RecordWriter.emit(
>>> RecordWriter.java:82)
>>>         at org.apache.flink.runtime.operators.shipping.
>>> OutputCollector.collect(OutputCollector.java:88)
>>>         at org.apache.flink.api.scala.GroupedDataSet$$anon$2.reduce(
>>> GroupedDataSet.scala:262)
>>>         at org.apache.flink.runtime.operators.GroupReduceDriver.
>>> run(GroupReduceDriver.java:124)
>>>         at org.apache.flink.runtime.operators.RegularPactTask.run(
>>> RegularPactTask.java:493)
>>>         at org.apache.flink.runtime.operators.RegularPactTask.
>>> invoke(RegularPactTask.java:360)
>>>         at org.apache.flink.runtime.execution.RuntimeEnvironment.
>>> run(RuntimeEnvironment.java:257)
>>>         at java.lang.Thread.run(Thread.java:745)
>>>
>>> I run the job locally, giving 2GB of Ram to the VM. The code will
>>> produce less than 10 groups and the bitsets used internally should not be
>>> larger than a few megabytes.
>>>
>>> Any tips on how to fix this?
>>>
>>> Best,
>>> Sebastian
>>>
>>> PS: Still waiting for a reduceGroup that gives me the key ;)
>>>
>>>
>>>
>>>
>>

Re: OutOfMemory during serialization

Posted by Sebastian <ss...@googlemail.com>.
I'm running flink from my IDE, how do change this setting in that context?

On 20.02.2015 11:41, Fabian Hueske wrote:
> Have you tried to increase the heap size by shrinking the TM-managed memory?
>
> Reduce the fraction (taskmanager.memory.fraction) or fix the amount of TM memory (taskmanager.memory.size) in the flink-config.yaml [1].
>
> Cheers, Fabian
>
> [1] http://flink.apache.org/docs/0.8/config.html
>
>
>> On 20 Feb 2015, at 11:30, Sebastian <ss...@googlemail.com> wrote:
>>
>> Hi,
>>
>> I get a strange out of memory error from the serialization code when I try to run the following program:
>>
>> def compute(trackingGraphFile: String, domainIndexFile: String,
>>   outputPath: String) = {
>>
>> implicit val env = ExecutionEnvironment.getExecutionEnvironment
>>
>> val edges = GraphUtils.readEdges(trackingGraphFile)
>> val domains = GraphUtils.readVertices(domainIndexFile)
>>
>> val domainsByCompany = DomainsByCompany.mapping
>> val companyEdges = edges.filter { edge =>
>>     domainsByCompany.contains(edge.src.toInt) }
>>   .map { edge => domainsByCompany(edge.src.toInt) -> edge.target.toInt }
>>   .distinct
>>
>> val companyBitMaps = companyEdges.groupBy(0).reduceGroup {
>>     domainsByCompany: Iterator[(String,Int)] =>
>>
>>     var company = ""
>>     val seenAt = new util.BitSet(42889800)
>>
>>     for ((name, domain) <- domainsByCompany) {
>>       company = name
>>       seenAt.set(domain)
>>     }
>>
>>     company -> seenAt
>>   }
>>
>>   companyBitMaps.print()
>>
>>   env.execute()
>>
>> }
>>
>>
>> The error looks as follows:
>>
>>
>> 2015-02-20 11:22:54 INFO  JobClient:345 - java.lang.OutOfMemoryError: Java heap space
>> 	at org.apache.flink.runtime.io.network.serialization.DataOutputSerializer.resize(DataOutputSerializer.java:249)
>> 	at org.apache.flink.runtime.io.network.serialization.DataOutputSerializer.write(DataOutputSerializer.java:93)
>> 	at org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)
>> 	at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)
>> 	at com.esotericsoftware.kryo.io.Output.require(Output.java:142)
>> 	at com.esotericsoftware.kryo.io.Output.writeBoolean(Output.java:613)
>> 	at com.twitter.chill.java.BitSetSerializer.write(BitSetSerializer.java:42)
>> 	at com.twitter.chill.java.BitSetSerializer.write(BitSetSerializer.java:29)
>> 	at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
>> 	at org.apache.flink.api.java.typeutils.runtime.KryoSerializer.serialize(KryoSerializer.java:155)
>> 	at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:91)
>> 	at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:30)
>> 	at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51)
>> 	at org.apache.flink.runtime.io.network.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76)
>> 	at org.apache.flink.runtime.io.network.api.RecordWriter.emit(RecordWriter.java:82)
>> 	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:88)
>> 	at org.apache.flink.api.scala.GroupedDataSet$$anon$2.reduce(GroupedDataSet.scala:262)
>> 	at org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:124)
>> 	at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:493)
>> 	at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:360)
>> 	at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:257)
>> 	at java.lang.Thread.run(Thread.java:745)
>>
>> I run the job locally, giving 2GB of Ram to the VM. The code will produce less than 10 groups and the bitsets used internally should not be larger than a few megabytes.
>>
>> Any tips on how to fix this?
>>
>> Best,
>> Sebastian
>>
>> PS: Still waiting for a reduceGroup that gives me the key ;)
>>
>>
>>
>

Re: OutOfMemory during serialization

Posted by Fabian Hueske <fh...@gmail.com>.
Have you tried to increase the heap size by shrinking the TM-managed memory?

Reduce the fraction (taskmanager.memory.fraction) or fix the amount of TM memory (taskmanager.memory.size) in the flink-config.yaml [1].

Cheers, Fabian

[1] http://flink.apache.org/docs/0.8/config.html


> On 20 Feb 2015, at 11:30, Sebastian <ss...@googlemail.com> wrote:
> 
> Hi,
> 
> I get a strange out of memory error from the serialization code when I try to run the following program:
> 
> def compute(trackingGraphFile: String, domainIndexFile: String,
>  outputPath: String) = {
> 
> implicit val env = ExecutionEnvironment.getExecutionEnvironment
> 
> val edges = GraphUtils.readEdges(trackingGraphFile)
> val domains = GraphUtils.readVertices(domainIndexFile)
> 
> val domainsByCompany = DomainsByCompany.mapping
> val companyEdges = edges.filter { edge =>
>    domainsByCompany.contains(edge.src.toInt) }
>  .map { edge => domainsByCompany(edge.src.toInt) -> edge.target.toInt }
>  .distinct
> 
> val companyBitMaps = companyEdges.groupBy(0).reduceGroup {
>    domainsByCompany: Iterator[(String,Int)] =>
> 
>    var company = ""
>    val seenAt = new util.BitSet(42889800)
> 
>    for ((name, domain) <- domainsByCompany) {
>      company = name
>      seenAt.set(domain)
>    }
> 
>    company -> seenAt
>  }
> 
>  companyBitMaps.print()
> 
>  env.execute()
> 
> }
> 
> 
> The error looks as follows:
> 
> 
> 2015-02-20 11:22:54 INFO  JobClient:345 - java.lang.OutOfMemoryError: Java heap space
> 	at org.apache.flink.runtime.io.network.serialization.DataOutputSerializer.resize(DataOutputSerializer.java:249)
> 	at org.apache.flink.runtime.io.network.serialization.DataOutputSerializer.write(DataOutputSerializer.java:93)
> 	at org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)
> 	at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)
> 	at com.esotericsoftware.kryo.io.Output.require(Output.java:142)
> 	at com.esotericsoftware.kryo.io.Output.writeBoolean(Output.java:613)
> 	at com.twitter.chill.java.BitSetSerializer.write(BitSetSerializer.java:42)
> 	at com.twitter.chill.java.BitSetSerializer.write(BitSetSerializer.java:29)
> 	at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
> 	at org.apache.flink.api.java.typeutils.runtime.KryoSerializer.serialize(KryoSerializer.java:155)
> 	at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:91)
> 	at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:30)
> 	at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51)
> 	at org.apache.flink.runtime.io.network.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76)
> 	at org.apache.flink.runtime.io.network.api.RecordWriter.emit(RecordWriter.java:82)
> 	at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:88)
> 	at org.apache.flink.api.scala.GroupedDataSet$$anon$2.reduce(GroupedDataSet.scala:262)
> 	at org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:124)
> 	at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:493)
> 	at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:360)
> 	at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:257)
> 	at java.lang.Thread.run(Thread.java:745)
> 
> I run the job locally, giving 2GB of Ram to the VM. The code will produce less than 10 groups and the bitsets used internally should not be larger than a few megabytes.
> 
> Any tips on how to fix this?
> 
> Best,
> Sebastian
> 
> PS: Still waiting for a reduceGroup that gives me the key ;)
> 
> 
>