You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@giraph.apache.org by Denis Dudinski <de...@gmail.com> on 2016/11/14 15:44:14 UTC

Computation hangs when Combiner is set and OOC is enabled

Hello,

We are using OutOofCore functionality to perform computations over
huge graph (billions of nodes).

Recently we have faced a situation when all our workers stuck doing
nothing except performing System.gc() triggered from Giraph's
ThresholdBasedOracle. The intriguing point was that no memory was
freed at all at each gc. At the same time our memory consumption level
was above highMemoryPressure and all commands that Oracle could give
to IO scheduler were STORE_MESSAGES_AND_BUFFERS and STORE_PARTITION.
However, there was NO partitions, messages or buffers available for
offloading.

We looked into state of the MetaPartitionManager and discovered that
according to state matrix within it all unprocessed partitions are
already spilled to disk as well as their messages. But there were no
data for messages stored on disk. A little bit more struggle and we
discovered that our RAM space was almost entirely consumed by incoming
messages placed in OneMessagePerVertexStore instance. Then we looked
into DiskBackedMessageStore and found out that it just don't offloads
any incoming message data when we use message combiner (please see
org.apache.giraph.ooc.data.DiskBackedMessageStore#offloadPartitionData
and org.apache.giraph.ooc.command.StoreIncomingMessageIOCommand).

This situation can be reproduced easily using big enough graph and two
workers with small amount of RAM and OOC enabled (and configured
properly). Even with combiner, which leaves only one message per
vertex, number of partitions and vertices can be too big to hold
incoming message data entirely in memory.

Can we somehow work around such limitation and NOT disable Combiner?

Our test computation config looks like this:

hadoop jar /opt/giraph-1.2.0/pr-job-jar-with-dependencies.jar
org.apache.giraph.GiraphRunner com.prototype.di.pr.PageRankComputation
\
-mc com.prototype.di.pr.PageRankMasterCompute \
-yj pr-job-jar-with-dependencies.jar \
-vif com.prototype.di.pr.input.HBLongVertexInputFormat \
-vof org.apache.giraph.io.formats.IdWithValueTextOutputFormat \
-op /user/hadoop/output/pr_test \
-w 2 \
-c com.prototype.di.pr.PRDoubleCombiner \
-wc com.prototype.di.pr.PageRankWorkerContext \
-ca hbase.rootdir=hdfs://namenode1.testcluster.com:8020/hbase \
-ca giraph.logLevel=info \
-ca hbase.mapreduce.inputtable=di_test \
-ca hbase.mapreduce.scan.columns=di:n \
-ca hbase.defaults.for.version.skip=true \
-ca hbase.table.row.textkey=false \
-ca giraph.yarn.task.heap.mb=10000 \
-ca giraph.isStaticGraph=true \
-ca giraph.SplitMasterWorker=false \
-ca giraph.oneToAllMsgSending=true \
-ca giraph.metrics.enable=false \
-ca giraph.jmap.histo.enable=false \
-ca giraph.vertexIdClass=com.prototype.di.pr.DomainPartAwareLongWritable \
-ca giraph.outgoingMessageValueClass=org.apache.hadoop.io.DoubleWritable \
-ca giraph.addDebugOpt=true \
-ca giraph.useOutOfCoreGraph=true \
-ca giraph.waitForPerWorkerRequests=true \
-ca giraph.maxNumberOfUnsentRequests=1000 \
-ca giraph.vertexInputFilterClass=com.prototype.di.pr.input.PagesFromSameDomainLimiter
\
-ca giraph.pr.di.maxPagesFromSameDomain=-1 \
-ca giraph.useInputSplitLocality=true \
-ca hbase.mapreduce.scan.cachedrows=1000 \
-ca giraph.minPartitionsPerComputeThread=150 \
-ca giraph.graphPartitionerFactoryClass=com.prototype.di.pr.DomainAwareGraphPartitionerFactory
\
-ca giraph.numInputThreads=1 \
-ca giraph.inputSplitSamplePercent=1 \
-ca giraph.pr.maxNeighborsPerVertex=256 \
-ca giraph.partitionClass=org.apache.giraph.partition.ByteArrayPartition \
-ca giraph.vertexClass=org.apache.giraph.graph.ByteValueVertex \
-ca giraph.inputOutEdgesClass=org.apache.giraph.edge.LongNullArrayEdges \
-ca giraph.numComputeThreads=2 \
-ca giraph.memory.failPressure=0.6 \
-ca giraph.memory.emergencyPressure=0.575 \
-ca giraph.memory.highPressure=0.55 \
-ca giraph.memory.optimalPressure=0.525 \
-ca giraph.memory.lowPressure=0.5

Thank you in advance.

Best Regards,
Denis Dudinski

Re: Computation hangs when Combiner is set and OOC is enabled

Posted by Denis Dudinski <de...@gmail.com>.
Hi Hassan,

Thank you very much for the quick response!

Can increasing the number of partitions really help in this case? All
partition data needed for calculations is already offloaded to disk, so
there are no partitions in memory. OneMessagePerVertexStore contains
vertexes for ALL partitions in memory (so it's size is similar to combined
sizes of all offloaded partitions in our case, vertex id is long and value
is double) and is not going to offload them with the increase of partitions
number.

Am I missing some point?

We also tried to run computation with the same settings but* without
combiner* and the computation got stuck too with the same symptoms. In this
case I think the reason may be in inconsistency between settings
giraph.minPartitionsPerComputeThread (150 in our case) and
giraph.flushBufferSize (8 MB by default). It seems that
org.apache.giraph.ooc.data.DiskBackedDataStore#getCandidateBuffersToOffload
can't choose any of partitions for offloading since their estimated buffer
sizes are below 8 MB.
Maybe the buffer data size estimation algorithm for each partition buffer
is too optimistic. According to heap dump one of partitions estimated size
in "left" MutablePair fiels is equal to 3679863 bytes (roughly 3,5 MB)
while heap dump shows for MutablePair's "right" field retained size of
25 444 296 (right java.util.ArrayList @ 0x551038578 25 444 296), which is
roughly 24 MB. It is almost 7 times bigger than estimated. Total size of
retained heap for org.apache.giraph.ooc.data.DiskBackedMessageStore is
equal to 5 812 238 448 bytes.

I think one more thing worth mentioning is our usage scenario. While we are
interested in quick computation, what is far more important to us is the
computation liveness. We can wait for the computation to complete for
several days or even week or two, if needed, but we must be sure it can
complete.

One more observation (maybe little bit offtopic): should we really have
reference to giraph configuration in each vertex instance? As I understand
config is immutable at times when it is injected into vertexes. Ref to
config consumes 8 bytes (x64) of memory for each object. Maybe the ref can
be replaced with static call to some static or thread local config holder
object?

Thank you once more!

Best Regards,
Denis Dudinski

2016-11-14 23:57 GMT+03:00 Hassan Eslami <hs...@gmail.com>:

> Hi Denis,
>
> Thanks for trying out the new OOC design. As you mentioned, by using
> message combiner we only keep one message for each vertex at all time. That
> means the storage needed for messages is #ofVertices*(SizeOfVertexId +
> SizeOfOneMessage). We studied several applications that use message
> combiner and noticed that in those applications message type is rather
> simple (usually a double or a pair of double or similar types). The vertex
> ID is, also, usually simple (a long value in most cases). That means per
> vertex we are keeping only 16-20 bytes for its message. We could offload
> (and in fact, it is very simple to) offload messages in case of having
> message combiners, but, we noticed that we could achieve a much better
> performance if we do not do so. In other words, it was intentional to not
> offload messages when message combiner is used. Although, message flow
> control is still in effect (and much needed) even with message combiners.
>
> I do not suggest you disable message combiner, as it further reduces the
> performance. Rather, *I suggest you increase the number of partitions per
> machine*. If you still see the issue (meaning that the RAM is too small
> to even hold one message per vertex), you can create a JIRA, mention
> exactly the numbers you have in your case (e.g., size of message, size of
> vertex id, size of RAM so there is a justification for the feature) and
> assign it to me, so that I add the option to offload the messages even when
> combiners are used.
>
> Also, I encourage you to not use the "isStaticGraph" option until it is
> completely fixed and tested.
>
> Best,
> Hassan
>
> On Mon, Nov 14, 2016 at 9:44 AM, Denis Dudinski <de...@gmail.com>
> wrote:
>
>> Hello,
>>
>> We are using OutOofCore functionality to perform computations over
>> huge graph (billions of nodes).
>>
>> Recently we have faced a situation when all our workers stuck doing
>> nothing except performing System.gc() triggered from Giraph's
>> ThresholdBasedOracle. The intriguing point was that no memory was
>> freed at all at each gc. At the same time our memory consumption level
>> was above highMemoryPressure and all commands that Oracle could give
>> to IO scheduler were STORE_MESSAGES_AND_BUFFERS and STORE_PARTITION.
>> However, there was NO partitions, messages or buffers available for
>> offloading.
>>
>> We looked into state of the MetaPartitionManager and discovered that
>> according to state matrix within it all unprocessed partitions are
>> already spilled to disk as well as their messages. But there were no
>> data for messages stored on disk. A little bit more struggle and we
>> discovered that our RAM space was almost entirely consumed by incoming
>> messages placed in OneMessagePerVertexStore instance. Then we looked
>> into DiskBackedMessageStore and found out that it just don't offloads
>> any incoming message data when we use message combiner (please see
>> org.apache.giraph.ooc.data.DiskBackedMessageStore#offloadPartitionData
>> and org.apache.giraph.ooc.command.StoreIncomingMessageIOCommand).
>>
>> This situation can be reproduced easily using big enough graph and two
>> workers with small amount of RAM and OOC enabled (and configured
>> properly). Even with combiner, which leaves only one message per
>> vertex, number of partitions and vertices can be too big to hold
>> incoming message data entirely in memory.
>>
>> Can we somehow work around such limitation and NOT disable Combiner?
>>
>> Our test computation config looks like this:
>>
>> hadoop jar /opt/giraph-1.2.0/pr-job-jar-with-dependencies.jar
>> org.apache.giraph.GiraphRunner com.prototype.di.pr.PageRankComputation
>> \
>> -mc com.prototype.di.pr.PageRankMasterCompute \
>> -yj pr-job-jar-with-dependencies.jar \
>> -vif com.prototype.di.pr.input.HBLongVertexInputFormat \
>> -vof org.apache.giraph.io.formats.IdWithValueTextOutputFormat \
>> -op /user/hadoop/output/pr_test \
>> -w 2 \
>> -c com.prototype.di.pr.PRDoubleCombiner \
>> -wc com.prototype.di.pr.PageRankWorkerContext \
>> -ca hbase.rootdir=hdfs://namenode1.testcluster.com:8020/hbase \
>> -ca giraph.logLevel=info \
>> -ca hbase.mapreduce.inputtable=di_test \
>> -ca hbase.mapreduce.scan.columns=di:n \
>> -ca hbase.defaults.for.version.skip=true \
>> -ca hbase.table.row.textkey=false \
>> -ca giraph.yarn.task.heap.mb=10000 \
>> -ca giraph.isStaticGraph=true \
>> -ca giraph.SplitMasterWorker=false \
>> -ca giraph.oneToAllMsgSending=true \
>> -ca giraph.metrics.enable=false \
>> -ca giraph.jmap.histo.enable=false \
>> -ca giraph.vertexIdClass=com.prototype.di.pr.DomainPartAwareLongWritable
>> \
>> -ca giraph.outgoingMessageValueClass=org.apache.hadoop.io.DoubleWritable
>> \
>> -ca giraph.addDebugOpt=true \
>> -ca giraph.useOutOfCoreGraph=true \
>> -ca giraph.waitForPerWorkerRequests=true \
>> -ca giraph.maxNumberOfUnsentRequests=1000 \
>> -ca giraph.vertexInputFilterClass=com.prototype.di.pr.input.Page
>> sFromSameDomainLimiter
>> \
>> -ca giraph.pr.di.maxPagesFromSameDomain=-1 \
>> -ca giraph.useInputSplitLocality=true \
>> -ca hbase.mapreduce.scan.cachedrows=1000 \
>> -ca giraph.minPartitionsPerComputeThread=150 \
>> -ca giraph.graphPartitionerFactoryClass=com.prototype.di.pr.Doma
>> inAwareGraphPartitionerFactory
>> \
>> -ca giraph.numInputThreads=1 \
>> -ca giraph.inputSplitSamplePercent=1 \
>> -ca giraph.pr.maxNeighborsPerVertex=256 \
>> -ca giraph.partitionClass=org.apache.giraph.partition.ByteArrayPartition
>> \
>> -ca giraph.vertexClass=org.apache.giraph.graph.ByteValueVertex \
>> -ca giraph.inputOutEdgesClass=org.apache.giraph.edge.LongNullArrayEdges \
>> -ca giraph.numComputeThreads=2 \
>> -ca giraph.memory.failPressure=0.6 \
>> -ca giraph.memory.emergencyPressure=0.575 \
>> -ca giraph.memory.highPressure=0.55 \
>> -ca giraph.memory.optimalPressure=0.525 \
>> -ca giraph.memory.lowPressure=0.5
>>
>> Thank you in advance.
>>
>> Best Regards,
>> Denis Dudinski
>>
>
>

Re: Computation hangs when Combiner is set and OOC is enabled

Posted by Hassan Eslami <hs...@gmail.com>.
Hi Denis,

Thanks for trying out the new OOC design. As you mentioned, by using
message combiner we only keep one message for each vertex at all time. That
means the storage needed for messages is #ofVertices*(SizeOfVertexId +
SizeOfOneMessage). We studied several applications that use message
combiner and noticed that in those applications message type is rather
simple (usually a double or a pair of double or similar types). The vertex
ID is, also, usually simple (a long value in most cases). That means per
vertex we are keeping only 16-20 bytes for its message. We could offload
(and in fact, it is very simple to) offload messages in case of having
message combiners, but, we noticed that we could achieve a much better
performance if we do not do so. In other words, it was intentional to not
offload messages when message combiner is used. Although, message flow
control is still in effect (and much needed) even with message combiners.

I do not suggest you disable message combiner, as it further reduces the
performance. Rather, *I suggest you increase the number of partitions per
machine*. If you still see the issue (meaning that the RAM is too small to
even hold one message per vertex), you can create a JIRA, mention exactly
the numbers you have in your case (e.g., size of message, size of vertex
id, size of RAM so there is a justification for the feature) and assign it
to me, so that I add the option to offload the messages even when combiners
are used.

Also, I encourage you to not use the "isStaticGraph" option until it is
completely fixed and tested.

Best,
Hassan

On Mon, Nov 14, 2016 at 9:44 AM, Denis Dudinski <de...@gmail.com>
wrote:

> Hello,
>
> We are using OutOofCore functionality to perform computations over
> huge graph (billions of nodes).
>
> Recently we have faced a situation when all our workers stuck doing
> nothing except performing System.gc() triggered from Giraph's
> ThresholdBasedOracle. The intriguing point was that no memory was
> freed at all at each gc. At the same time our memory consumption level
> was above highMemoryPressure and all commands that Oracle could give
> to IO scheduler were STORE_MESSAGES_AND_BUFFERS and STORE_PARTITION.
> However, there was NO partitions, messages or buffers available for
> offloading.
>
> We looked into state of the MetaPartitionManager and discovered that
> according to state matrix within it all unprocessed partitions are
> already spilled to disk as well as their messages. But there were no
> data for messages stored on disk. A little bit more struggle and we
> discovered that our RAM space was almost entirely consumed by incoming
> messages placed in OneMessagePerVertexStore instance. Then we looked
> into DiskBackedMessageStore and found out that it just don't offloads
> any incoming message data when we use message combiner (please see
> org.apache.giraph.ooc.data.DiskBackedMessageStore#offloadPartitionData
> and org.apache.giraph.ooc.command.StoreIncomingMessageIOCommand).
>
> This situation can be reproduced easily using big enough graph and two
> workers with small amount of RAM and OOC enabled (and configured
> properly). Even with combiner, which leaves only one message per
> vertex, number of partitions and vertices can be too big to hold
> incoming message data entirely in memory.
>
> Can we somehow work around such limitation and NOT disable Combiner?
>
> Our test computation config looks like this:
>
> hadoop jar /opt/giraph-1.2.0/pr-job-jar-with-dependencies.jar
> org.apache.giraph.GiraphRunner com.prototype.di.pr.PageRankComputation
> \
> -mc com.prototype.di.pr.PageRankMasterCompute \
> -yj pr-job-jar-with-dependencies.jar \
> -vif com.prototype.di.pr.input.HBLongVertexInputFormat \
> -vof org.apache.giraph.io.formats.IdWithValueTextOutputFormat \
> -op /user/hadoop/output/pr_test \
> -w 2 \
> -c com.prototype.di.pr.PRDoubleCombiner \
> -wc com.prototype.di.pr.PageRankWorkerContext \
> -ca hbase.rootdir=hdfs://namenode1.testcluster.com:8020/hbase \
> -ca giraph.logLevel=info \
> -ca hbase.mapreduce.inputtable=di_test \
> -ca hbase.mapreduce.scan.columns=di:n \
> -ca hbase.defaults.for.version.skip=true \
> -ca hbase.table.row.textkey=false \
> -ca giraph.yarn.task.heap.mb=10000 \
> -ca giraph.isStaticGraph=true \
> -ca giraph.SplitMasterWorker=false \
> -ca giraph.oneToAllMsgSending=true \
> -ca giraph.metrics.enable=false \
> -ca giraph.jmap.histo.enable=false \
> -ca giraph.vertexIdClass=com.prototype.di.pr.DomainPartAwareLongWritable \
> -ca giraph.outgoingMessageValueClass=org.apache.hadoop.io.DoubleWritable \
> -ca giraph.addDebugOpt=true \
> -ca giraph.useOutOfCoreGraph=true \
> -ca giraph.waitForPerWorkerRequests=true \
> -ca giraph.maxNumberOfUnsentRequests=1000 \
> -ca giraph.vertexInputFilterClass=com.prototype.di.pr.input.
> PagesFromSameDomainLimiter
> \
> -ca giraph.pr.di.maxPagesFromSameDomain=-1 \
> -ca giraph.useInputSplitLocality=true \
> -ca hbase.mapreduce.scan.cachedrows=1000 \
> -ca giraph.minPartitionsPerComputeThread=150 \
> -ca giraph.graphPartitionerFactoryClass=com.prototype.di.pr.
> DomainAwareGraphPartitionerFactory
> \
> -ca giraph.numInputThreads=1 \
> -ca giraph.inputSplitSamplePercent=1 \
> -ca giraph.pr.maxNeighborsPerVertex=256 \
> -ca giraph.partitionClass=org.apache.giraph.partition.ByteArrayPartition \
> -ca giraph.vertexClass=org.apache.giraph.graph.ByteValueVertex \
> -ca giraph.inputOutEdgesClass=org.apache.giraph.edge.LongNullArrayEdges \
> -ca giraph.numComputeThreads=2 \
> -ca giraph.memory.failPressure=0.6 \
> -ca giraph.memory.emergencyPressure=0.575 \
> -ca giraph.memory.highPressure=0.55 \
> -ca giraph.memory.optimalPressure=0.525 \
> -ca giraph.memory.lowPressure=0.5
>
> Thank you in advance.
>
> Best Regards,
> Denis Dudinski
>