You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by matthes <md...@sensenetworks.com> on 2014/09/22 18:11:59 UTC

Setup an huge Unserializable Object in a mapper

Hello everybody!

I’m newbe in spark and I hope my problem is solvable!
I need to setup an instance which I want to use is a mapper function. The
problem is it is not Serializable and the broadcast function is no option
for me. The Instance can become very huge (e.g. 1GB-10GB). Is there a way to
setup the getTree function only onetime per prozess like in hadoop. Because
at the moment it will be called for every partition and then I ran out of
memory. The second question is, is there also a secure way to limit the
tasks of mapper that I will never get more as the defined limit?
If this way is totally wrong, please let me know. I’m open for any ideas.

My first try is:

val countresult = file.mapPartitions { valueIterator =>

        val s2tree = getTree(bcTreefilename.value) 

        valueIterator.map { x =>
          val split = x.split("\t")
          val result: String = ""
          val key = split(1)
          var value = CountContainer(split(3).toInt)
           
          if (s2tree.lookupContainingCellsSimple(new
S2CellId(split(2).toLong))) {
            value.exposureCnt = value.totalCnt
          }

          (key, value)
        }
      }.reduceByKey{ (x,y) => x.add(y); x}.cache()

Best,

Matthias




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Setup-an-huge-Unserializable-Object-in-a-mapper-tp14817.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Setup an huge Unserializable Object in a mapper

Posted by matthes <md...@sensenetworks.com>.
I solved it :) I moved the lookupObject into the function where I create the
broadcast and now all works very well!

object lookupObject 
{ 
private var treeFile : org.apache.spark.broadcast.Broadcast[String] = _ 

def main(args: Array[String]): Unit = { 
… 
val treeFile = sc.broadcast(args(0)) 

object treeContainer 
{ 
          val tree : S2Lookup = loadTree 
          
          def dolookup(id : Long) : Boolean = 
          { 
            return tree.lookupSimple(new S2CellId(id)) 
          } 
          def loadTree() : S2Lookup = 
          { 
            val path = new Path(treeFile.value); // treeFile is everytime
null 
            val fileSystem = FileSystem.get(new Configuration()) 
            new S2Lookup(ConversionUtils.deserializeCovering(new
InputStreamReader(fileSystem.open(path)))) 
          } 
}

… 
} 

} 



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Setup-an-huge-Unserializable-Object-in-a-mapper-tp14817p14916.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Setup an huge Unserializable Object in a mapper

Posted by matthes <md...@sensenetworks.com>.
Thank you for the answer and sorry for the double question, but now it works!
I have one additional question, is it possible to use a broadcast variable
in this object, at the moment I try it in the way below, but the broadcast
object is still null.

object lookupObject
{
private var treeFile : org.apache.spark.broadcast.Broadcast[String] = _

def main(args: Array[String]): Unit = {
…
val treeFile = sc.broadcast(args(0))
…
}
object treeContainer 
{
	  val tree : S2Lookup = loadTree
	  
	  def dolookup(id : Long) : Boolean =
	  {
	    return tree.lookupSimple(new S2CellId(id))
	  }
	  def loadTree() : S2Lookup =
	  {
	    val path = new Path(treeFile.value); // treeFile is everytime null
	    val fileSystem = FileSystem.get(new Configuration())
	    new S2Lookup(ConversionUtils.deserializeCovering(new
InputStreamReader(fileSystem.open(path))))
	  }
}	
}




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Setup-an-huge-Unserializable-Object-in-a-mapper-tp14817p14899.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Setup an huge Unserializable Object in a mapper

Posted by Sean Owen <so...@cloudera.com>.
I think this was covered in this thread last week:
https://www.mail-archive.com/user@spark.apache.org/msg10493.html

Try a singleton pattern to call this once per JVM. That only makes
much sense if this object is immutable.

On Mon, Sep 22, 2014 at 5:11 PM, matthes <md...@sensenetworks.com> wrote:
> Hello everybody!
>
> I’m newbe in spark and I hope my problem is solvable!
> I need to setup an instance which I want to use is a mapper function. The
> problem is it is not Serializable and the broadcast function is no option
> for me. The Instance can become very huge (e.g. 1GB-10GB). Is there a way to
> setup the getTree function only onetime per prozess like in hadoop. Because
> at the moment it will be called for every partition and then I ran out of
> memory. The second question is, is there also a secure way to limit the
> tasks of mapper that I will never get more as the defined limit?
> If this way is totally wrong, please let me know. I’m open for any ideas.
>
> My first try is:
>
> val countresult = file.mapPartitions { valueIterator =>
>
>         val s2tree = getTree(bcTreefilename.value)
>
>         valueIterator.map { x =>
>           val split = x.split("\t")
>           val result: String = ""
>           val key = split(1)
>           var value = CountContainer(split(3).toInt)
>
>           if (s2tree.lookupContainingCellsSimple(new
> S2CellId(split(2).toLong))) {
>             value.exposureCnt = value.totalCnt
>           }
>
>           (key, value)
>         }
>       }.reduceByKey{ (x,y) => x.add(y); x}.cache()
>
> Best,
>
> Matthias
>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Setup-an-huge-Unserializable-Object-in-a-mapper-tp14817.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org