You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Sriram Ramachandrasekaran <sr...@gmail.com> on 2013/10/28 19:17:27 UTC

Data processing conventions with Spark.

Hello,

I'm trying to process text documents and build a context vector for each
term/feature in the corpus. Context vector is vector of features that are
"around" a term in the corpus within a distance of x.

Now, when I run this code, the process suffers a GC overhead error and I
get an OOM when it builds this featureContextVector.

Some configurations for perusal.
Cluster type: Stand alone cluster
Number of nodes: 1(master and slave same node)
Job memory: tried with default - 512m, 1G and 2G as well.
Data size: number of docs size is 12.8M, each document being 50-100 chars
long - 1.2G raw data size not considering any JVM overhead.

So, while it looks reasonable that, the job definitely needs more memory, I
was wondering why the records in RDD doesn't spill over to disk so as to
reduce the memory pressure. I also tried changing StorageLevel of the
parent RDD(in this case the docs RDD - see below) type to DISK_ONLY but to
no effect.

1. I want to know if this is an expected behavior and if so why.
2. And, when does RDD spill over to disk kick-in? Is it something that we
should enable while constructing the RDD or SparkContext? Kindly clarify.

3. On a slightly unrelated note, I also want to know if there's an elegant
way to create incremental document Ids when processing documents using
spark. The problem I face is, when I iterate over an RDD, the processing
might get distributed over nodes. So, I can't have an id that's unique and
auto incr across these nodes. I tried .collect().zipWithIndex(), but, that
has the limitation of storing data in memory, which is not desirable when
doing large scale processing. Am I missing something?


Below is the snippet that does the job...

Some type/glossary here:
*docs : RDD[Document], Document - case class to hold document with some
metadata*
*list2FreqMap - transforms a list[T] to Map[T, Int] where, Int value is the
number of times the key has occurred in the list.*


=== Snippet ===
val featureContextVector = docs.flatMap{ doc =>
      val terms = doc.tokenize.toList
      val context  = terms.zipWithIndex.map{ case (term, index) => term ->
(terms, index) }
      context
    }.groupBy(_._1)
    .map{ kv =>
      (kv._1, kv._2.map(_._2))
    }.map{ kv =>
      // contexts are generated based on term, its index in the document
and the window size. we pick "window" items "around" the current position.
      val (term, termsIndexPairs) = kv
      val contextList = termsIndexPairs.flatMap{ case (terms, index) =>
        terms.slice(indexWithinBounds(index-windowSize, terms.size), index)
++ terms.slice(index+1, indexWithinBounds(index+windowSize, terms.size)+1)
      }
      val contextVectorForTerm = list2FreqMap(contextList).toIterable
      term -> contextVectorForTerm
    }



-- 
It's just about how deep your longing is!

Re: Data processing conventions with Spark.

Posted by Sriram Ramachandrasekaran <sr...@gmail.com>.
Hey folks - I was reading up some documentation wrt RDDs and I see that
people've clarified that they indeed get spilled to disk. But,
unfortunately for me, it doesn't seem to be the case.
Makes me wonder, if I need to set up some configurations right for it.

Any ideas?


On Mon, Oct 28, 2013 at 11:47 PM, Sriram Ramachandrasekaran <
sri.rams85@gmail.com> wrote:

> Hello,
>
> I'm trying to process text documents and build a context vector for each
> term/feature in the corpus. Context vector is vector of features that are
> "around" a term in the corpus within a distance of x.
>
> Now, when I run this code, the process suffers a GC overhead error and I
> get an OOM when it builds this featureContextVector.
>
> Some configurations for perusal.
> Cluster type: Stand alone cluster
> Number of nodes: 1(master and slave same node)
> Job memory: tried with default - 512m, 1G and 2G as well.
> Data size: number of docs size is 12.8M, each document being 50-100 chars
> long - 1.2G raw data size not considering any JVM overhead.
>
> So, while it looks reasonable that, the job definitely needs more memory,
> I was wondering why the records in RDD doesn't spill over to disk so as to
> reduce the memory pressure. I also tried changing StorageLevel of the
> parent RDD(in this case the docs RDD - see below) type to DISK_ONLY but to
> no effect.
>
> 1. I want to know if this is an expected behavior and if so why.
> 2. And, when does RDD spill over to disk kick-in? Is it something that we
> should enable while constructing the RDD or SparkContext? Kindly clarify.
>
> 3. On a slightly unrelated note, I also want to know if there's an elegant
> way to create incremental document Ids when processing documents using
> spark. The problem I face is, when I iterate over an RDD, the processing
> might get distributed over nodes. So, I can't have an id that's unique and
> auto incr across these nodes. I tried .collect().zipWithIndex(), but, that
> has the limitation of storing data in memory, which is not desirable when
> doing large scale processing. Am I missing something?
>
>
> Below is the snippet that does the job...
>
> Some type/glossary here:
> *docs : RDD[Document], Document - case class to hold document with some
> metadata*
> *list2FreqMap - transforms a list[T] to Map[T, Int] where, Int value is
> the number of times the key has occurred in the list.*
>
>
> === Snippet ===
> val featureContextVector = docs.flatMap{ doc =>
>       val terms = doc.tokenize.toList
>       val context  = terms.zipWithIndex.map{ case (term, index) => term ->
> (terms, index) }
>       context
>     }.groupBy(_._1)
>     .map{ kv =>
>       (kv._1, kv._2.map(_._2))
>     }.map{ kv =>
>       // contexts are generated based on term, its index in the document
> and the window size. we pick "window" items "around" the current position.
>       val (term, termsIndexPairs) = kv
>       val contextList = termsIndexPairs.flatMap{ case (terms, index) =>
>         terms.slice(indexWithinBounds(index-windowSize, terms.size),
> index) ++ terms.slice(index+1, indexWithinBounds(index+windowSize,
> terms.size)+1)
>       }
>       val contextVectorForTerm = list2FreqMap(contextList).toIterable
>       term -> contextVectorForTerm
>     }
>
>
>
> --
> It's just about how deep your longing is!
>



-- 
It's just about how deep your longing is!