You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Himangshu Ranjan Borah (JIRA)" <ji...@apache.org> on 2018/07/24 21:33:00 UTC

[jira] [Created] (SPARK-24910) Spark Bloom Filter Closure Serialization improvement for very high volume of Data

Himangshu Ranjan Borah created SPARK-24910:
----------------------------------------------

             Summary: Spark Bloom Filter Closure Serialization improvement for very high volume of Data
                 Key: SPARK-24910
                 URL: https://issues.apache.org/jira/browse/SPARK-24910
             Project: Spark
          Issue Type: Improvement
          Components: Spark Core, SQL
    Affects Versions: 2.3.1
            Reporter: Himangshu Ranjan Borah


I am proposing an improvement to the Bloom Filter Generation logic being used in the DataFrameStatFunctions' Bloom Filter API using mapPartitions() instead of aggregate() to avoid closure serialization which fails for huge BitArrays.

Spark's Stat Functions' Bloom Filter Implementation uses aggregate/treeAggregate operations which uses a closure with a dependency on the bloom filter that is created in the driver. Since Spark hard codes the closure serializer to Java Serializer it fails in closure cleanup for very big sizes of Bloom Filters (Typically with num items ~ Billions and with fpp ~ 0.001). Kryo serializer work's fine in such a scale but seems like there were some issues using Kryo for closure serialization due to which Spark 2.0 hardcoded it to Java. The call-stack that we get typically looks like,

{{{color:#f79232}java.lang.OutOfMemoryError{color}}}
{{{color:#f79232} at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123){color}}}
{{{color:#f79232} at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117){color}}}
{{{color:#f79232} at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93){color}}}
{{{color:#f79232} at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153){color}}}
{{{color:#f79232} at org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41){color}}}
{{{color:#f79232} at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877){color}}}
{{{color:#f79232} at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786){color}}}
{{{color:#f79232} at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189){color}}}
{{{color:#f79232} at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548){color}}}
{{{color:#f79232} at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509){color}}}
{{{color:#f79232} at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432){color}}}
{{{color:#f79232} at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178){color}}}
{{{color:#f79232} at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348){color}}}
{{{color:#f79232} at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43){color}}}
{{{color:#f79232} at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100){color}}}
{{{color:#f79232} at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:342){color}}}
{{{color:#f79232} at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:335){color}}}
{{{color:#f79232} at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159){color}}}
{{{color:#f79232} at org.apache.spark.SparkContext.clean(SparkContext.scala:2292){color}}}
{{{color:#f79232} at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022){color}}}
{{{color:#f79232} at org.apache.spark.SparkContext.runJob(SparkContext.scala:2124){color}}}
{{{color:#f79232} at org.apache.spark.rdd.RDD$$anonfun$fold$1.apply(RDD.scala:1092){color}}}
{{{color:#f79232} at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151){color}}}
{{{color:#f79232} at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112){color}}}
{{{color:#f79232} at org.apache.spark.rdd.RDD.withScope(RDD.scala:363){color}}}
{{{color:#f79232} at org.apache.spark.rdd.RDD.fold(RDD.scala:1086){color}}}
{{{color:#f79232} at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1155){color}}}
{{{color:#f79232} at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151){color}}}
{{{color:#f79232} at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112){color}}}
{{{color:#f79232} at org.apache.spark.rdd.RDD.withScope(RDD.scala:363){color}}}
{{{color:#f79232} at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1131){color}}}
{{{color:#f79232} at org.apache.spark.sql.DataFrameStatFunctions.buildBloomFilter(DataFrameStatFunctions.scala:554){color}}}
{{{color:#f79232} at org.apache.spark.sql.DataFrameStatFunctions.bloomFilter(DataFrameStatFunctions.scala:505){color}}}

This issue can be overcome if we *don't* use the *aggregate()* operations for the Bloom Filter generation and use *mapPartitions()* kind of operations where we create the Bloom Filters inside the executors (by giving them enough memory and controlling the no. of executors and partitions) and then return the final bloom filters per partition to the driver after which we can aggregate just the bloom filters either in the driver itself or distributing it using  theeAggregate(). This way we can make use of the Kryo serializer for just the returning Bloom Filter and it works fine on big datasets and is scalable according to the cluster specifications. Please comment on this from your end or let me know of any upcoming improvements on this issue that you might already be working on.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org