You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Akhil Das <ak...@sigmoidanalytics.com> on 2015/03/12 07:08:58 UTC

Re: Taking a lot of time to write a ~500MB data into files/cassandra

Awesome. :)

Thanks
Best Regards

On Thu, Mar 12, 2015 at 4:04 AM, Varad Meru <va...@gmail.com> wrote:

> Thanks a lot Akhil for your suggestions. Was able to improve the
> performance very much. Apologies for the late reply.
>
> On Mon, Feb 23, 2015 at 2:09 AM, Akhil Das <ak...@sigmoidanalytics.com>
> wrote:
>
>> Its only 1 task doing the job, that's why its slow. You can repartition
>> the data before doing the write, also how are you defining your spark
>> master? Make sure you give at least 10 threads to process (by setting
>> master as local[10]).
>>
>> Thanks
>> Best Regards
>>
>> On Mon, Feb 23, 2015 at 3:28 PM, Varad Meru <va...@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>> I am creating an inverted-index, with tf-idf, for a simple indexing
>>> project using the code given below. The Inverted Index creation time took
>>> ~11 minutes for creating index for roughly 85K distinct URLs. I have
>>> attached the relevant Images for the same. The problem I am facing is the
>>> final writing of the index. I have tried both writing to the *TextFile
>>> and Cassandra* and it takes lot of time just for writing a couple of
>>> hundred MBs of Data. It has taken more than ~2 hours. I have also copied
>>> some parts of the logging statements generated by spark.
>>>
>>> I am running on a Macbook Pro 15’, i7 2.4 QC, 16 GB Ram, on Local Mode.
>>>
>>> I am writing using the following code -
>>> // finalRDD.saveAsTextFile("/Users/varadmeru/index_output")
>>> val savingToCassandra = finalRDD.saveToCassandra("finder_keyspace",
>>> "inverted_index",
>>>     SomeColumns("token_text", "collection_freq", "idf", "count_map",
>>> "tfidf_map"))
>>>
>>> Could you please let me know if this is a know issue, or a problem with
>>> my code, or happens while running in the local mode.
>>>
>>> Thanks in advance.
>>>
>>> Regards,
>>> Varad
>>>
>>>
>>> PS: Code and Images attached.
>>>
>>> ---
>>> My Source is a Cassandra Table -
>>> val tab = sc.cassandraTable("probe_keyspace", "probe_corpus")
>>> val idTextRDD = tab.select("url_id", "page_text”)
>>> val idTextProjectionTableRDD = idTextRDD.map { cassandraRow => (
>>> cassandraRow.get[Int]("url_id"), cassandraRow.get[String]("page_text"))
>>> }
>>>
>>> The finalRDD is an RDD with a collection of 5-tuples with the structure
>>> <String, Int, Double, Map<Int, Int>, Map<Int, Double>>.  -
>>>
>>> val finalRDD = new PairRDDFunctions(idTextProjectionTableRDD.flatMap {
>>>       case (urlId, pageText) => pageText.split("""\W+""")
>>>         // Use .zipWithIndex
>>>         .filter { x => isNotAStopWord(x) }
>>>         .map { word => (word.toLowerCase()) }
>>>         .map { word => PorterStemmer.stem(word) } // Applying Porter
>>> Stemmer
>>>         .filter { word => !word.isEmpty() } // After Stemming, some
>>> words can become empty.
>>>         .map { word => (word, urlId) } //
>>>     }.map {
>>>       case (word, urlId) => ((word, urlId), 1)
>>>     })
>>>       .reduceByKey {
>>>         case (sumFromLeft, count) => sumFromLeft + count
>>>       }.map {
>>>         case ((word, urlId), count) => (word, (urlId, count))
>>>       }.groupBy {
>>>         case (word, (urlId, count)) => word
>>>       }.map {
>>>         case tupleWordUrlIdCount =>
>>>           (tupleWordUrlIdCount._1, // Token
>>>             tupleWordUrlIdCount._2.toList.map(_._2).map(_._2).sum, //
>>> Collection Frequency
>>>             math.log(totalNumberOfDocuments / tupleWordUrlIdCount._2.
>>> toList.size), // IDF
>>>             tupleWordUrlIdCount._2.toList.map(tupleUrlIdCount =>
>>> tupleUrlIdCount._2).toMap, // Counts per URL ID
>>>             tupleWordUrlIdCount._2.toList.map(tupleUrlIdCount =>
>>>               (tupleUrlIdCount._2._1, tupleUrlIdCount._2._2 * math.log(
>>> totalNumberOfDocuments / tupleWordUrlIdCount._2.toList.size))).toMap) //
>>> TFIDF per URL ID.
>>>       }
>>>
>>>
>>> 15/02/22 23:47:53 INFO Executor: Running task 0.0 in stage 3.0 (TID 3)
>>> 15/02/22 23:47:53 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty
>>> blocks out of 1 blocks
>>> 15/02/22 23:47:53 INFO ShuffleBlockFetcherIterator: Started 0 remote
>>> fetches in 1 ms
>>> 15/02/22 23:47:55 INFO ExternalAppendOnlyMap: Thread 111 spilling
>>> in-memory map of 1089.6 MB to disk (1 time so far)
>>> 15/02/22 23:48:00 INFO ExternalAppendOnlyMap: Thread 111 spilling
>>> in-memory map of 1111.2 MB to disk (2 times so far)
>>> 15/02/22 23:48:04 INFO ExternalAppendOnlyMap: Thread 111 spilling
>>> in-memory map of 1058.1 MB to disk (3 times so far)
>>> 15/02/22 23:48:09 INFO ExternalAppendOnlyMap: Thread 111 spilling
>>> in-memory map of 1113.2 MB to disk (4 times so far)
>>> 15/02/22 23:48:13 INFO ExternalAppendOnlyMap: Thread 111 spilling
>>> in-memory map of 1103.0 MB to disk (5 times so far)
>>> 15/02/22 23:48:18 INFO ExternalAppendOnlyMap: Thread 111 spilling
>>> in-memory map of 1090.8 MB to disk (6 times so far)
>>> 15/02/22 23:48:23 INFO ExternalAppendOnlyMap: Thread 111 spilling
>>> in-memory map of 1098.7 MB to disk (7 times so far)
>>> 15/02/22 23:48:27 INFO ExternalAppendOnlyMap: Thread 111 spilling
>>> in-memory map of 1111.6 MB to disk (8 times so far)
>>> 15/02/22 23:48:32 INFO ExternalAppendOnlyMap: Thread 111 spilling
>>> in-memory map of 1090.4 MB to disk (9 times so far)
>>> 15/02/22 23:48:36 INFO ExternalAppendOnlyMap: Thread 111 spilling
>>> in-memory map of 1103.5 MB to disk (10 times so far)
>>> 15/02/22 23:48:41 INFO ExternalAppendOnlyMap: Thread 111 spilling
>>> in-memory map of 1200.5 MB to disk (11 times so far)
>>> 15/02/22 23:48:44 INFO Cluster: New Cassandra host /127.0.0.1:9042 added
>>> 15/02/22 23:48:44 INFO CassandraConnector: Connected to Cassandra
>>> cluster: Test Cluster
>>> 15/02/22 23:48:44 INFO LocalNodeFirstLoadBalancingPolicy: Adding host
>>> 127.0.0.1 (datacenter1)
>>> 15/02/22 23:48:44 INFO LocalNodeFirstLoadBalancingPolicy: Adding host
>>> 127.0.0.1 (datacenter1)
>>> 15/02/23 01:08:30 INFO BlockManager: Removing broadcast 2
>>> 15/02/23 01:08:30 INFO BlockManager: Removing block broadcast_2_piece0
>>> 15/02/23 01:08:30 INFO MemoryStore: Block broadcast_2_piece0 of size
>>> 1907 dropped from memory (free 2061627049)
>>> 15/02/23 01:08:30 INFO BlockManagerInfo: Removed broadcast_2_piece0 on
>>> localhost:63675 in memory (size: 1907.0 B, free: 1966.1 MB)
>>> 15/02/23 01:08:30 INFO BlockManagerMaster: Updated info of block
>>> broadcast_2_piece0
>>> 15/02/23 01:08:30 INFO BlockManager: Removing block broadcast_2
>>> 15/02/23 01:08:30 INFO MemoryStore: Block broadcast_2 of size 2736
>>> dropped from memory (free 2061629785)
>>> 15/02/23 01:08:30 INFO ContextCleaner: Cleaned broadcast 2
>>> 15/02/23 01:49:53 INFO TableWriter: Wrote 185559 rows in 185559 batches
>>> to finder_keyspace.inverted_index in 7269.084 s.
>>> 15/02/23 01:49:53 INFO Executor: Finished task 0.0 in stage 3.0 (TID 3).
>>> 824 bytes result sent to driver
>>> 15/02/23 01:49:53 INFO TaskSetManager: Finished task 0.0 in stage 3.0
>>> (TID 3) in 7320392 ms on localhost (1/1)
>>> 15/02/23 01:49:53 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose
>>> tasks have all completed, from pool
>>> 15/02/23 01:49:53 INFO DAGScheduler: Stage 3 (runJob at
>>> RDDFunctions.scala:24) finished in 7320.394 s
>>> 15/02/23 01:49:53 INFO DAGScheduler: Job 1 finished: runJob at
>>> RDDFunctions.scala:24, took 6068.654338 s
>>> 15/02/23 01:49:54 INFO CassandraConnector: Disconnected from Cassandra
>>> cluster: Test Cluster
>>>
>>>
>>> Varad Meru
>>> Graduate Student
>>> UC Irvine
>>>
>>
>>
>
>
> --
> Varad Meru
> @vrdmr <https://twitter.com/vrdmr>
> about.me/vrdmr
>