You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by ๏̯͡๏ <ÐΞ€ρ@Ҝ>,
de...@gmail.com on 2015/07/14 05:34:14 UTC
How to speed up Spark process
I have 100 MB of Avro data. and i do repartition(307) is taking forever.
2. val x = input.repartition(7907).map( {k1,k2,k3,k4}, {inputRecord} )
3. val quantiles = x.map( {k1,k2,k3,k4}, TDigest(inputRecord).asBytes
).reduceByKey() [ This was groupBy earlier ]
4. x.join(quantiles).coalesce(100).writeInAvro
Attached is full Scala code.
I have 340 Yarn node cluster with 14G Ram on each node and have input data
of just just 100 MB. (Hadoop takes 2.5 hours on 1 TB dataset)
./bin/spark-submit -v --master yarn-cluster --jars
/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.4/lib/spark_reporting_dep_only-1.0-SNAPSHOT.jar
--num-executors 330 --driver-memory 14g --driver-java-options
"-XX:MaxPermSize=512M -Xmx4096M -Xms4096M -verbose:gc -XX:+PrintGCDetails
-XX:+PrintGCTimeStamps" --executor-memory 14g --executor-cores 1 --queue
hdmi-others --class com.ebay.ep.poc.spark.reporting.SparkApp
/home/dvasthimal/spark1.4/lib/spark_reporting-1.0-SNAPSHOT.jar
startDate=2015-06-20 endDate=2015-06-21
input=/apps/hdmi-prod/b_um/epdatasets/exptsession subcommand=ppwmasterprime
output=/user/dvasthimal/epdatasets/ppwmasterprime buffersize=128
maxbuffersize=1068 maxResultSize=200G
I see this in stdout of the task on that executor
15/07/13 19:58:48 WARN hdfs.BlockReaderLocal: The short-circuit local
reads feature cannot be used because libhadoop cannot be loaded.
15/07/13 20:00:08 INFO collection.ExternalSorter: Thread 47 spilling
in-memory map of 2.2 GB to disk (1 time so far)
15/07/13 20:01:31 INFO collection.ExternalSorter: Thread 47 spilling
in-memory map of 2.2 GB to disk (2 times so far)
15/07/13 20:03:07 INFO collection.ExternalSorter: Thread 47 spilling
in-memory map of 2.2 GB to disk (3 times so far)
15/07/13 20:04:32 INFO collection.ExternalSorter: Thread 47 spilling
in-memory map of 2.2 GB to disk (4 times so far)
15/07/13 20:06:21 INFO collection.ExternalSorter: Thread 47 spilling
in-memory map of 2.2 GB to disk (5 times so far)
15/07/13 20:08:09 INFO collection.ExternalSorter: Thread 47 spilling
in-memory map of 2.2 GB to disk (6 times so far)
15/07/13 20:09:51 INFO collection.ExternalSorter: Thread 47 spilling
in-memory map of 2.2 GB to disk (7 times so far)
Also attached is the thread dump
--
Deepak
Re: How to speed up Spark process
Posted by ๏̯͡๏ <ÐΞ€ρ@Ҝ>,
de...@gmail.com.
Any solutions to solve this exception ?
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an
output location for shuffle 1
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:389)
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:386)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
On Mon, Jul 13, 2015 at 11:34 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <de...@gmail.com> wrote:
> genericRecordsAndKeys.persist(StorageLevel.MEMORY_AND_DISK) with 17 as
> repartitioning argument is throwing this exception:
>
>
> 7/13 23:26:36 INFO yarn.ApplicationMaster: Final app status: FAILED,
> exitCode: 15, (reason: User class threw exception:
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 14
> in stage 2.0 failed 4 times, most recent failure: Lost task 14.3 in stage
> 2.0 (TID 37, phxaishdc9dn0725.phx.ebay.com): java.lang.RuntimeException:
> java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
>
> at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)
>
> at
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125)
>
> at
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113)
>
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285)
>
> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127)
>
> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134)
>
> at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:509)
>
> at
> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:300)
>
> at
> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>
> at
> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>
> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>
> at
> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
>
> at
> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114)
>
>
> On Mon, Jul 13, 2015 at 10:37 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <de...@gmail.com>
> wrote:
>
>> I stopped at 35 repartitions as it takes around 12-14 minutes. I cached a
>> RDD as it was used in the next two tasks. However it slowed down the
>> process.
>>
>> Code:
>>
>> val genericRecordsAndKeys = inputRecords.map {
>>
>> record =>
>>
>> val rec = new MasterPrimeRecord(detail, record)
>>
>> var keyToOutput = new StringBuilder("");
>>
>> dimensions.foreach {
>>
>> dim =>
>>
>> keyToOutput = keyToOutput.append("_" + rec.get(dim).toString)
>>
>> }
>>
>> (keyToOutput.toString, rec)
>>
>> }
>>
>> genericRecordsAndKeys.cache
>>
>>
>> val quantiles = genericRecordsAndKeys
>>
>> .map {
>>
>> case (keyToOutput, rec) =>
>>
>> var digest: TDigest = TDigest.createAvlTreeDigest(10)
>>
>> val fpPaidGMB = rec.get("fpPaidGMB").asInstanceOf[Double]
>>
>> digest.add(fpPaidGMB)
>>
>> var bbuf: ByteBuffer = ByteBuffer.allocate(digest.byteSize());
>>
>> digest.asBytes(bbuf);
>>
>> (keyToOutput.toString, bbuf.array())
>>
>> }.reduceByKey {
>>
>> case (v1, v2) =>
>>
>> var tree1 = AVLTreeDigest.fromBytes(ByteBuffer.wrap(v1
>> .asInstanceOf[scala.Array[Byte]]))
>>
>> var tree2 = AVLTreeDigest.fromBytes(ByteBuffer.wrap(v2
>> .asInstanceOf[scala.Array[Byte]]))
>>
>> tree1.add(tree2)
>>
>> tree1.compress()
>>
>> var bbuf: ByteBuffer = ByteBuffer.allocate(tree1.byteSize())
>>
>> tree1.asBytes(bbuf)
>>
>> bbuf.array
>>
>> }
>>
>>
>> val outputRecords: RDD[(AvroKey[MasterPrimeRecord], NullWritable)] =
>> genericRecordsAndKeys.join(quantiles).map {
>>
>> case (k, v) =>
>>
>> val masterPrimeRec = v._1
>>
>> val mergedTree = AVLTreeDigest.fromBytes(ByteBuffer.wrap(v._2))
>>
>> val capVal = mergedTree.quantile(0.999)
>>
>> if (masterPrimeRec.get("fpPaidGMB").asInstanceOf[Double] > capVal)
>> {
>>
>> masterPrimeRec.put("fpPaidGMB", capVal)
>>
>> }
>>
>> val wrap = new AvroKey[MasterPrimeRecord](masterPrimeRec)
>>
>> (wrap, NullWritable.get)
>>
>> }
>>
>> On Mon, Jul 13, 2015 at 9:48 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <de...@gmail.com>
>> wrote:
>>
>>> My guess worked fine now. The repartion took aproximately 1/4 the time
>>> as i reduce the number of paritions.
>>> And the rest of the process took 1/4 extra time but that is ok.
>>>
>>> On Mon, Jul 13, 2015 at 9:46 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <de...@gmail.com>
>>> wrote:
>>>
>>>> I reduced the number of partitions to 1/4 to 76 in order to reduce
>>>> the time to 1/4 (from 33 to 8) But the re-parition is still running beyond
>>>> 15 mins.
>>>>
>>>> @Nirmal
>>>> click on details, shows the code lines and does not show why it is
>>>> slow. I know that repartition is slow and want to speed it up
>>>>
>>>> @Sharma
>>>> I have seen increasing the cores speeds up reparition, but it does slow
>>>> down the rest of the stages in the job plan.
>>>>
>>>>
>>>> I need some logical explanation and math to know before hand ,
>>>> otherwise with Spark am always firing in dark. Spark has been a
>>>> depressingly lackluster so far (Join use case and now a simple outlier
>>>> detection using TDigest)
>>>>
>>>> On Mon, Jul 13, 2015 at 9:37 PM, Aniruddh Sharma <as...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Deepak
>>>>>
>>>>> Not 100% sure , but please try increasing (--executor-cores ) to twice
>>>>> the number of your physical cores on your machine.
>>>>>
>>>>> Thanks and Regards
>>>>> Aniruddh
>>>>>
>>>>> On Tue, Jul 14, 2015 at 9:49 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <de...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Its been 30 minutes and still the partitioner has not completed yet,
>>>>>> its ever.
>>>>>>
>>>>>> Without repartition, i see this error
>>>>>> https://issues.apache.org/jira/browse/SPARK-5928
>>>>>>
>>>>>>
>>>>>> FetchFailed(BlockManagerId(1, imran-2.ent.cloudera.com, 55028), shuffleId=1, mapId=0, reduceId=0, message=
>>>>>> org.apache.spark.shuffle.FetchFailedException: Adjusted frame length exceeds 2147483647: 3021252889 - discarded
>>>>>> at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
>>>>>> at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
>>>>>> at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
>>>>>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>>>> at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, Jul 13, 2015 at 8:34 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <de...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I have 100 MB of Avro data. and i do repartition(307) is taking
>>>>>>> forever.
>>>>>>>
>>>>>>> 2. val x = input.repartition(7907).map( {k1,k2,k3,k4}, {inputRecord}
>>>>>>> )
>>>>>>> 3. val quantiles =
>>>>>>> x.map( {k1,k2,k3,k4}, TDigest(inputRecord).asBytes ).reduceByKey() [ This
>>>>>>> was groupBy earlier ]
>>>>>>> 4. x.join(quantiles).coalesce(100).writeInAvro
>>>>>>>
>>>>>>>
>>>>>>> Attached is full Scala code.
>>>>>>>
>>>>>>> I have 340 Yarn node cluster with 14G Ram on each node and have
>>>>>>> input data of just just 100 MB. (Hadoop takes 2.5 hours on 1 TB dataset)
>>>>>>>
>>>>>>>
>>>>>>> ./bin/spark-submit -v --master yarn-cluster --jars
>>>>>>> /apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.4/lib/spark_reporting_dep_only-1.0-SNAPSHOT.jar
>>>>>>> --num-executors 330 --driver-memory 14g --driver-java-options
>>>>>>> "-XX:MaxPermSize=512M -Xmx4096M -Xms4096M -verbose:gc -XX:+PrintGCDetails
>>>>>>> -XX:+PrintGCTimeStamps" --executor-memory 14g --executor-cores 1 --queue
>>>>>>> hdmi-others --class com.ebay.ep.poc.spark.reporting.SparkApp
>>>>>>> /home/dvasthimal/spark1.4/lib/spark_reporting-1.0-SNAPSHOT.jar
>>>>>>> startDate=2015-06-20 endDate=2015-06-21
>>>>>>> input=/apps/hdmi-prod/b_um/epdatasets/exptsession subcommand=ppwmasterprime
>>>>>>> output=/user/dvasthimal/epdatasets/ppwmasterprime buffersize=128
>>>>>>> maxbuffersize=1068 maxResultSize=200G
>>>>>>>
>>>>>>>
>>>>>>> I see this in stdout of the task on that executor
>>>>>>>
>>>>>>> 15/07/13 19:58:48 WARN hdfs.BlockReaderLocal: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
>>>>>>> 15/07/13 20:00:08 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (1 time so far)
>>>>>>> 15/07/13 20:01:31 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (2 times so far)
>>>>>>> 15/07/13 20:03:07 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (3 times so far)
>>>>>>> 15/07/13 20:04:32 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (4 times so far)
>>>>>>> 15/07/13 20:06:21 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (5 times so far)
>>>>>>> 15/07/13 20:08:09 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (6 times so far)
>>>>>>> 15/07/13 20:09:51 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (7 times so far)
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Also attached is the thread dump
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Deepak
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Deepak
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Deepak
>>>>
>>>>
>>>
>>>
>>> --
>>> Deepak
>>>
>>>
>>
>>
>> --
>> Deepak
>>
>>
>
>
> --
> Deepak
>
>
--
Deepak
Re: How to speed up Spark process
Posted by ๏̯͡๏ <ÐΞ€ρ@Ҝ>,
de...@gmail.com.
genericRecordsAndKeys.persist(StorageLevel.MEMORY_AND_DISK) with 17 as
repartitioning argument is throwing this exception:
7/13 23:26:36 INFO yarn.ApplicationMaster: Final app status: FAILED,
exitCode: 15, (reason: User class threw exception:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 14
in stage 2.0 failed 4 times, most recent failure: Lost task 14.3 in stage
2.0 (TID 37, phxaishdc9dn0725.phx.ebay.com): java.lang.RuntimeException:
java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)
at
org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125)
at
org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134)
at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:509)
at
org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:300)
at
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
at
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at
org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
at
org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114)
On Mon, Jul 13, 2015 at 10:37 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <de...@gmail.com> wrote:
> I stopped at 35 repartitions as it takes around 12-14 minutes. I cached a
> RDD as it was used in the next two tasks. However it slowed down the
> process.
>
> Code:
>
> val genericRecordsAndKeys = inputRecords.map {
>
> record =>
>
> val rec = new MasterPrimeRecord(detail, record)
>
> var keyToOutput = new StringBuilder("");
>
> dimensions.foreach {
>
> dim =>
>
> keyToOutput = keyToOutput.append("_" + rec.get(dim).toString)
>
> }
>
> (keyToOutput.toString, rec)
>
> }
>
> genericRecordsAndKeys.cache
>
>
> val quantiles = genericRecordsAndKeys
>
> .map {
>
> case (keyToOutput, rec) =>
>
> var digest: TDigest = TDigest.createAvlTreeDigest(10)
>
> val fpPaidGMB = rec.get("fpPaidGMB").asInstanceOf[Double]
>
> digest.add(fpPaidGMB)
>
> var bbuf: ByteBuffer = ByteBuffer.allocate(digest.byteSize());
>
> digest.asBytes(bbuf);
>
> (keyToOutput.toString, bbuf.array())
>
> }.reduceByKey {
>
> case (v1, v2) =>
>
> var tree1 = AVLTreeDigest.fromBytes(ByteBuffer.wrap(v1
> .asInstanceOf[scala.Array[Byte]]))
>
> var tree2 = AVLTreeDigest.fromBytes(ByteBuffer.wrap(v2
> .asInstanceOf[scala.Array[Byte]]))
>
> tree1.add(tree2)
>
> tree1.compress()
>
> var bbuf: ByteBuffer = ByteBuffer.allocate(tree1.byteSize())
>
> tree1.asBytes(bbuf)
>
> bbuf.array
>
> }
>
>
> val outputRecords: RDD[(AvroKey[MasterPrimeRecord], NullWritable)] =
> genericRecordsAndKeys.join(quantiles).map {
>
> case (k, v) =>
>
> val masterPrimeRec = v._1
>
> val mergedTree = AVLTreeDigest.fromBytes(ByteBuffer.wrap(v._2))
>
> val capVal = mergedTree.quantile(0.999)
>
> if (masterPrimeRec.get("fpPaidGMB").asInstanceOf[Double] > capVal)
> {
>
> masterPrimeRec.put("fpPaidGMB", capVal)
>
> }
>
> val wrap = new AvroKey[MasterPrimeRecord](masterPrimeRec)
>
> (wrap, NullWritable.get)
>
> }
>
> On Mon, Jul 13, 2015 at 9:48 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <de...@gmail.com>
> wrote:
>
>> My guess worked fine now. The repartion took aproximately 1/4 the time as
>> i reduce the number of paritions.
>> And the rest of the process took 1/4 extra time but that is ok.
>>
>> On Mon, Jul 13, 2015 at 9:46 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <de...@gmail.com>
>> wrote:
>>
>>> I reduced the number of partitions to 1/4 to 76 in order to reduce
>>> the time to 1/4 (from 33 to 8) But the re-parition is still running beyond
>>> 15 mins.
>>>
>>> @Nirmal
>>> click on details, shows the code lines and does not show why it is slow.
>>> I know that repartition is slow and want to speed it up
>>>
>>> @Sharma
>>> I have seen increasing the cores speeds up reparition, but it does slow
>>> down the rest of the stages in the job plan.
>>>
>>>
>>> I need some logical explanation and math to know before hand , otherwise
>>> with Spark am always firing in dark. Spark has been a depressingly
>>> lackluster so far (Join use case and now a simple outlier detection using
>>> TDigest)
>>>
>>> On Mon, Jul 13, 2015 at 9:37 PM, Aniruddh Sharma <as...@gmail.com>
>>> wrote:
>>>
>>>> Hi Deepak
>>>>
>>>> Not 100% sure , but please try increasing (--executor-cores ) to twice
>>>> the number of your physical cores on your machine.
>>>>
>>>> Thanks and Regards
>>>> Aniruddh
>>>>
>>>> On Tue, Jul 14, 2015 at 9:49 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <de...@gmail.com>
>>>> wrote:
>>>>
>>>>> Its been 30 minutes and still the partitioner has not completed yet,
>>>>> its ever.
>>>>>
>>>>> Without repartition, i see this error
>>>>> https://issues.apache.org/jira/browse/SPARK-5928
>>>>>
>>>>>
>>>>> FetchFailed(BlockManagerId(1, imran-2.ent.cloudera.com, 55028), shuffleId=1, mapId=0, reduceId=0, message=
>>>>> org.apache.spark.shuffle.FetchFailedException: Adjusted frame length exceeds 2147483647: 3021252889 - discarded
>>>>> at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
>>>>> at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
>>>>> at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
>>>>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>>> at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Jul 13, 2015 at 8:34 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <de...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I have 100 MB of Avro data. and i do repartition(307) is taking
>>>>>> forever.
>>>>>>
>>>>>> 2. val x = input.repartition(7907).map( {k1,k2,k3,k4}, {inputRecord} )
>>>>>> 3. val quantiles =
>>>>>> x.map( {k1,k2,k3,k4}, TDigest(inputRecord).asBytes ).reduceByKey() [ This
>>>>>> was groupBy earlier ]
>>>>>> 4. x.join(quantiles).coalesce(100).writeInAvro
>>>>>>
>>>>>>
>>>>>> Attached is full Scala code.
>>>>>>
>>>>>> I have 340 Yarn node cluster with 14G Ram on each node and have input
>>>>>> data of just just 100 MB. (Hadoop takes 2.5 hours on 1 TB dataset)
>>>>>>
>>>>>>
>>>>>> ./bin/spark-submit -v --master yarn-cluster --jars
>>>>>> /apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.4/lib/spark_reporting_dep_only-1.0-SNAPSHOT.jar
>>>>>> --num-executors 330 --driver-memory 14g --driver-java-options
>>>>>> "-XX:MaxPermSize=512M -Xmx4096M -Xms4096M -verbose:gc -XX:+PrintGCDetails
>>>>>> -XX:+PrintGCTimeStamps" --executor-memory 14g --executor-cores 1 --queue
>>>>>> hdmi-others --class com.ebay.ep.poc.spark.reporting.SparkApp
>>>>>> /home/dvasthimal/spark1.4/lib/spark_reporting-1.0-SNAPSHOT.jar
>>>>>> startDate=2015-06-20 endDate=2015-06-21
>>>>>> input=/apps/hdmi-prod/b_um/epdatasets/exptsession subcommand=ppwmasterprime
>>>>>> output=/user/dvasthimal/epdatasets/ppwmasterprime buffersize=128
>>>>>> maxbuffersize=1068 maxResultSize=200G
>>>>>>
>>>>>>
>>>>>> I see this in stdout of the task on that executor
>>>>>>
>>>>>> 15/07/13 19:58:48 WARN hdfs.BlockReaderLocal: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
>>>>>> 15/07/13 20:00:08 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (1 time so far)
>>>>>> 15/07/13 20:01:31 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (2 times so far)
>>>>>> 15/07/13 20:03:07 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (3 times so far)
>>>>>> 15/07/13 20:04:32 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (4 times so far)
>>>>>> 15/07/13 20:06:21 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (5 times so far)
>>>>>> 15/07/13 20:08:09 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (6 times so far)
>>>>>> 15/07/13 20:09:51 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (7 times so far)
>>>>>>
>>>>>>
>>>>>>
>>>>>> Also attached is the thread dump
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Deepak
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Deepak
>>>>>
>>>>>
>>>>
>>>
>>>
>>> --
>>> Deepak
>>>
>>>
>>
>>
>> --
>> Deepak
>>
>>
>
>
> --
> Deepak
>
>
--
Deepak
Re: How to speed up Spark process
Posted by ๏̯͡๏ <ÐΞ€ρ@Ҝ>,
de...@gmail.com.
I stopped at 35 repartitions as it takes around 12-14 minutes. I cached a
RDD as it was used in the next two tasks. However it slowed down the
process.
Code:
val genericRecordsAndKeys = inputRecords.map {
record =>
val rec = new MasterPrimeRecord(detail, record)
var keyToOutput = new StringBuilder("");
dimensions.foreach {
dim =>
keyToOutput = keyToOutput.append("_" + rec.get(dim).toString)
}
(keyToOutput.toString, rec)
}
genericRecordsAndKeys.cache
val quantiles = genericRecordsAndKeys
.map {
case (keyToOutput, rec) =>
var digest: TDigest = TDigest.createAvlTreeDigest(10)
val fpPaidGMB = rec.get("fpPaidGMB").asInstanceOf[Double]
digest.add(fpPaidGMB)
var bbuf: ByteBuffer = ByteBuffer.allocate(digest.byteSize());
digest.asBytes(bbuf);
(keyToOutput.toString, bbuf.array())
}.reduceByKey {
case (v1, v2) =>
var tree1 = AVLTreeDigest.fromBytes(ByteBuffer.wrap(v1
.asInstanceOf[scala.Array[Byte]]))
var tree2 = AVLTreeDigest.fromBytes(ByteBuffer.wrap(v2
.asInstanceOf[scala.Array[Byte]]))
tree1.add(tree2)
tree1.compress()
var bbuf: ByteBuffer = ByteBuffer.allocate(tree1.byteSize())
tree1.asBytes(bbuf)
bbuf.array
}
val outputRecords: RDD[(AvroKey[MasterPrimeRecord], NullWritable)] =
genericRecordsAndKeys.join(quantiles).map {
case (k, v) =>
val masterPrimeRec = v._1
val mergedTree = AVLTreeDigest.fromBytes(ByteBuffer.wrap(v._2))
val capVal = mergedTree.quantile(0.999)
if (masterPrimeRec.get("fpPaidGMB").asInstanceOf[Double] > capVal) {
masterPrimeRec.put("fpPaidGMB", capVal)
}
val wrap = new AvroKey[MasterPrimeRecord](masterPrimeRec)
(wrap, NullWritable.get)
}
On Mon, Jul 13, 2015 at 9:48 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <de...@gmail.com> wrote:
> My guess worked fine now. The repartion took aproximately 1/4 the time as
> i reduce the number of paritions.
> And the rest of the process took 1/4 extra time but that is ok.
>
> On Mon, Jul 13, 2015 at 9:46 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <de...@gmail.com>
> wrote:
>
>> I reduced the number of partitions to 1/4 to 76 in order to reduce the
>> time to 1/4 (from 33 to 8) But the re-parition is still running beyond 15
>> mins.
>>
>> @Nirmal
>> click on details, shows the code lines and does not show why it is slow.
>> I know that repartition is slow and want to speed it up
>>
>> @Sharma
>> I have seen increasing the cores speeds up reparition, but it does slow
>> down the rest of the stages in the job plan.
>>
>>
>> I need some logical explanation and math to know before hand , otherwise
>> with Spark am always firing in dark. Spark has been a depressingly
>> lackluster so far (Join use case and now a simple outlier detection using
>> TDigest)
>>
>> On Mon, Jul 13, 2015 at 9:37 PM, Aniruddh Sharma <as...@gmail.com>
>> wrote:
>>
>>> Hi Deepak
>>>
>>> Not 100% sure , but please try increasing (--executor-cores ) to twice
>>> the number of your physical cores on your machine.
>>>
>>> Thanks and Regards
>>> Aniruddh
>>>
>>> On Tue, Jul 14, 2015 at 9:49 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <de...@gmail.com>
>>> wrote:
>>>
>>>> Its been 30 minutes and still the partitioner has not completed yet,
>>>> its ever.
>>>>
>>>> Without repartition, i see this error
>>>> https://issues.apache.org/jira/browse/SPARK-5928
>>>>
>>>>
>>>> FetchFailed(BlockManagerId(1, imran-2.ent.cloudera.com, 55028), shuffleId=1, mapId=0, reduceId=0, message=
>>>> org.apache.spark.shuffle.FetchFailedException: Adjusted frame length exceeds 2147483647: 3021252889 - discarded
>>>> at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
>>>> at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
>>>> at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
>>>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>> at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>>>>
>>>>
>>>>
>>>>
>>>> On Mon, Jul 13, 2015 at 8:34 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <de...@gmail.com>
>>>> wrote:
>>>>
>>>>> I have 100 MB of Avro data. and i do repartition(307) is taking
>>>>> forever.
>>>>>
>>>>> 2. val x = input.repartition(7907).map( {k1,k2,k3,k4}, {inputRecord} )
>>>>> 3. val quantiles = x.map( {k1,k2,k3,k4}, TDigest(inputRecord).asBytes
>>>>> ).reduceByKey() [ This was groupBy earlier ]
>>>>> 4. x.join(quantiles).coalesce(100).writeInAvro
>>>>>
>>>>>
>>>>> Attached is full Scala code.
>>>>>
>>>>> I have 340 Yarn node cluster with 14G Ram on each node and have input
>>>>> data of just just 100 MB. (Hadoop takes 2.5 hours on 1 TB dataset)
>>>>>
>>>>>
>>>>> ./bin/spark-submit -v --master yarn-cluster --jars
>>>>> /apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.4/lib/spark_reporting_dep_only-1.0-SNAPSHOT.jar
>>>>> --num-executors 330 --driver-memory 14g --driver-java-options
>>>>> "-XX:MaxPermSize=512M -Xmx4096M -Xms4096M -verbose:gc -XX:+PrintGCDetails
>>>>> -XX:+PrintGCTimeStamps" --executor-memory 14g --executor-cores 1 --queue
>>>>> hdmi-others --class com.ebay.ep.poc.spark.reporting.SparkApp
>>>>> /home/dvasthimal/spark1.4/lib/spark_reporting-1.0-SNAPSHOT.jar
>>>>> startDate=2015-06-20 endDate=2015-06-21
>>>>> input=/apps/hdmi-prod/b_um/epdatasets/exptsession subcommand=ppwmasterprime
>>>>> output=/user/dvasthimal/epdatasets/ppwmasterprime buffersize=128
>>>>> maxbuffersize=1068 maxResultSize=200G
>>>>>
>>>>>
>>>>> I see this in stdout of the task on that executor
>>>>>
>>>>> 15/07/13 19:58:48 WARN hdfs.BlockReaderLocal: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
>>>>> 15/07/13 20:00:08 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (1 time so far)
>>>>> 15/07/13 20:01:31 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (2 times so far)
>>>>> 15/07/13 20:03:07 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (3 times so far)
>>>>> 15/07/13 20:04:32 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (4 times so far)
>>>>> 15/07/13 20:06:21 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (5 times so far)
>>>>> 15/07/13 20:08:09 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (6 times so far)
>>>>> 15/07/13 20:09:51 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (7 times so far)
>>>>>
>>>>>
>>>>>
>>>>> Also attached is the thread dump
>>>>>
>>>>>
>>>>> --
>>>>> Deepak
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Deepak
>>>>
>>>>
>>>
>>
>>
>> --
>> Deepak
>>
>>
>
>
> --
> Deepak
>
>
--
Deepak
Re: How to speed up Spark process
Posted by ๏̯͡๏ <ÐΞ€ρ@Ҝ>,
de...@gmail.com.
My guess worked fine now. The repartion took aproximately 1/4 the time as i
reduce the number of paritions.
And the rest of the process took 1/4 extra time but that is ok.
On Mon, Jul 13, 2015 at 9:46 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <de...@gmail.com> wrote:
> I reduced the number of partitions to 1/4 to 76 in order to reduce the
> time to 1/4 (from 33 to 8) But the re-parition is still running beyond 15
> mins.
>
> @Nirmal
> click on details, shows the code lines and does not show why it is slow. I
> know that repartition is slow and want to speed it up
>
> @Sharma
> I have seen increasing the cores speeds up reparition, but it does slow
> down the rest of the stages in the job plan.
>
>
> I need some logical explanation and math to know before hand , otherwise
> with Spark am always firing in dark. Spark has been a depressingly
> lackluster so far (Join use case and now a simple outlier detection using
> TDigest)
>
> On Mon, Jul 13, 2015 at 9:37 PM, Aniruddh Sharma <as...@gmail.com>
> wrote:
>
>> Hi Deepak
>>
>> Not 100% sure , but please try increasing (--executor-cores ) to twice
>> the number of your physical cores on your machine.
>>
>> Thanks and Regards
>> Aniruddh
>>
>> On Tue, Jul 14, 2015 at 9:49 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <de...@gmail.com>
>> wrote:
>>
>>> Its been 30 minutes and still the partitioner has not completed yet, its
>>> ever.
>>>
>>> Without repartition, i see this error
>>> https://issues.apache.org/jira/browse/SPARK-5928
>>>
>>>
>>> FetchFailed(BlockManagerId(1, imran-2.ent.cloudera.com, 55028), shuffleId=1, mapId=0, reduceId=0, message=
>>> org.apache.spark.shuffle.FetchFailedException: Adjusted frame length exceeds 2147483647: 3021252889 - discarded
>>> at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
>>> at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
>>> at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
>>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>> at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>>>
>>>
>>>
>>>
>>> On Mon, Jul 13, 2015 at 8:34 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <de...@gmail.com>
>>> wrote:
>>>
>>>> I have 100 MB of Avro data. and i do repartition(307) is taking forever.
>>>>
>>>> 2. val x = input.repartition(7907).map( {k1,k2,k3,k4}, {inputRecord} )
>>>> 3. val quantiles = x.map( {k1,k2,k3,k4}, TDigest(inputRecord).asBytes
>>>> ).reduceByKey() [ This was groupBy earlier ]
>>>> 4. x.join(quantiles).coalesce(100).writeInAvro
>>>>
>>>>
>>>> Attached is full Scala code.
>>>>
>>>> I have 340 Yarn node cluster with 14G Ram on each node and have input
>>>> data of just just 100 MB. (Hadoop takes 2.5 hours on 1 TB dataset)
>>>>
>>>>
>>>> ./bin/spark-submit -v --master yarn-cluster --jars
>>>> /apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.4/lib/spark_reporting_dep_only-1.0-SNAPSHOT.jar
>>>> --num-executors 330 --driver-memory 14g --driver-java-options
>>>> "-XX:MaxPermSize=512M -Xmx4096M -Xms4096M -verbose:gc -XX:+PrintGCDetails
>>>> -XX:+PrintGCTimeStamps" --executor-memory 14g --executor-cores 1 --queue
>>>> hdmi-others --class com.ebay.ep.poc.spark.reporting.SparkApp
>>>> /home/dvasthimal/spark1.4/lib/spark_reporting-1.0-SNAPSHOT.jar
>>>> startDate=2015-06-20 endDate=2015-06-21
>>>> input=/apps/hdmi-prod/b_um/epdatasets/exptsession subcommand=ppwmasterprime
>>>> output=/user/dvasthimal/epdatasets/ppwmasterprime buffersize=128
>>>> maxbuffersize=1068 maxResultSize=200G
>>>>
>>>>
>>>> I see this in stdout of the task on that executor
>>>>
>>>> 15/07/13 19:58:48 WARN hdfs.BlockReaderLocal: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
>>>> 15/07/13 20:00:08 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (1 time so far)
>>>> 15/07/13 20:01:31 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (2 times so far)
>>>> 15/07/13 20:03:07 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (3 times so far)
>>>> 15/07/13 20:04:32 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (4 times so far)
>>>> 15/07/13 20:06:21 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (5 times so far)
>>>> 15/07/13 20:08:09 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (6 times so far)
>>>> 15/07/13 20:09:51 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (7 times so far)
>>>>
>>>>
>>>>
>>>> Also attached is the thread dump
>>>>
>>>>
>>>> --
>>>> Deepak
>>>>
>>>>
>>>
>>>
>>> --
>>> Deepak
>>>
>>>
>>
>
>
> --
> Deepak
>
>
--
Deepak
Re: How to speed up Spark process
Posted by ๏̯͡๏ <ÐΞ€ρ@Ҝ>,
de...@gmail.com.
I reduced the number of partitions to 1/4 to 76 in order to reduce the
time to 1/4 (from 33 to 8) But the re-parition is still running beyond 15
mins.
@Nirmal
click on details, shows the code lines and does not show why it is slow. I
know that repartition is slow and want to speed it up
@Sharma
I have seen increasing the cores speeds up reparition, but it does slow
down the rest of the stages in the job plan.
I need some logical explanation and math to know before hand , otherwise
with Spark am always firing in dark. Spark has been a depressingly
lackluster so far (Join use case and now a simple outlier detection using
TDigest)
On Mon, Jul 13, 2015 at 9:37 PM, Aniruddh Sharma <as...@gmail.com>
wrote:
> Hi Deepak
>
> Not 100% sure , but please try increasing (--executor-cores ) to twice the
> number of your physical cores on your machine.
>
> Thanks and Regards
> Aniruddh
>
> On Tue, Jul 14, 2015 at 9:49 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <de...@gmail.com>
> wrote:
>
>> Its been 30 minutes and still the partitioner has not completed yet, its
>> ever.
>>
>> Without repartition, i see this error
>> https://issues.apache.org/jira/browse/SPARK-5928
>>
>>
>> FetchFailed(BlockManagerId(1, imran-2.ent.cloudera.com, 55028), shuffleId=1, mapId=0, reduceId=0, message=
>> org.apache.spark.shuffle.FetchFailedException: Adjusted frame length exceeds 2147483647: 3021252889 - discarded
>> at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
>> at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
>> at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>> at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>>
>>
>>
>>
>> On Mon, Jul 13, 2015 at 8:34 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <de...@gmail.com>
>> wrote:
>>
>>> I have 100 MB of Avro data. and i do repartition(307) is taking forever.
>>>
>>> 2. val x = input.repartition(7907).map( {k1,k2,k3,k4}, {inputRecord} )
>>> 3. val quantiles = x.map( {k1,k2,k3,k4}, TDigest(inputRecord).asBytes
>>> ).reduceByKey() [ This was groupBy earlier ]
>>> 4. x.join(quantiles).coalesce(100).writeInAvro
>>>
>>>
>>> Attached is full Scala code.
>>>
>>> I have 340 Yarn node cluster with 14G Ram on each node and have input
>>> data of just just 100 MB. (Hadoop takes 2.5 hours on 1 TB dataset)
>>>
>>>
>>> ./bin/spark-submit -v --master yarn-cluster --jars
>>> /apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.4/lib/spark_reporting_dep_only-1.0-SNAPSHOT.jar
>>> --num-executors 330 --driver-memory 14g --driver-java-options
>>> "-XX:MaxPermSize=512M -Xmx4096M -Xms4096M -verbose:gc -XX:+PrintGCDetails
>>> -XX:+PrintGCTimeStamps" --executor-memory 14g --executor-cores 1 --queue
>>> hdmi-others --class com.ebay.ep.poc.spark.reporting.SparkApp
>>> /home/dvasthimal/spark1.4/lib/spark_reporting-1.0-SNAPSHOT.jar
>>> startDate=2015-06-20 endDate=2015-06-21
>>> input=/apps/hdmi-prod/b_um/epdatasets/exptsession subcommand=ppwmasterprime
>>> output=/user/dvasthimal/epdatasets/ppwmasterprime buffersize=128
>>> maxbuffersize=1068 maxResultSize=200G
>>>
>>>
>>> I see this in stdout of the task on that executor
>>>
>>> 15/07/13 19:58:48 WARN hdfs.BlockReaderLocal: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
>>> 15/07/13 20:00:08 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (1 time so far)
>>> 15/07/13 20:01:31 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (2 times so far)
>>> 15/07/13 20:03:07 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (3 times so far)
>>> 15/07/13 20:04:32 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (4 times so far)
>>> 15/07/13 20:06:21 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (5 times so far)
>>> 15/07/13 20:08:09 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (6 times so far)
>>> 15/07/13 20:09:51 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (7 times so far)
>>>
>>>
>>>
>>> Also attached is the thread dump
>>>
>>>
>>> --
>>> Deepak
>>>
>>>
>>
>>
>> --
>> Deepak
>>
>>
>
--
Deepak
Re: How to speed up Spark process
Posted by Aniruddh Sharma <as...@gmail.com>.
Hi Deepak
Not 100% sure , but please try increasing (--executor-cores ) to twice the
number of your physical cores on your machine.
Thanks and Regards
Aniruddh
On Tue, Jul 14, 2015 at 9:49 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <de...@gmail.com> wrote:
> Its been 30 minutes and still the partitioner has not completed yet, its
> ever.
>
> Without repartition, i see this error
> https://issues.apache.org/jira/browse/SPARK-5928
>
>
> FetchFailed(BlockManagerId(1, imran-2.ent.cloudera.com, 55028), shuffleId=1, mapId=0, reduceId=0, message=
> org.apache.spark.shuffle.FetchFailedException: Adjusted frame length exceeds 2147483647: 3021252889 - discarded
> at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
> at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
> at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>
>
>
>
> On Mon, Jul 13, 2015 at 8:34 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <de...@gmail.com>
> wrote:
>
>> I have 100 MB of Avro data. and i do repartition(307) is taking forever.
>>
>> 2. val x = input.repartition(7907).map( {k1,k2,k3,k4}, {inputRecord} )
>> 3. val quantiles = x.map( {k1,k2,k3,k4}, TDigest(inputRecord).asBytes
>> ).reduceByKey() [ This was groupBy earlier ]
>> 4. x.join(quantiles).coalesce(100).writeInAvro
>>
>>
>> Attached is full Scala code.
>>
>> I have 340 Yarn node cluster with 14G Ram on each node and have input
>> data of just just 100 MB. (Hadoop takes 2.5 hours on 1 TB dataset)
>>
>>
>> ./bin/spark-submit -v --master yarn-cluster --jars
>> /apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.4/lib/spark_reporting_dep_only-1.0-SNAPSHOT.jar
>> --num-executors 330 --driver-memory 14g --driver-java-options
>> "-XX:MaxPermSize=512M -Xmx4096M -Xms4096M -verbose:gc -XX:+PrintGCDetails
>> -XX:+PrintGCTimeStamps" --executor-memory 14g --executor-cores 1 --queue
>> hdmi-others --class com.ebay.ep.poc.spark.reporting.SparkApp
>> /home/dvasthimal/spark1.4/lib/spark_reporting-1.0-SNAPSHOT.jar
>> startDate=2015-06-20 endDate=2015-06-21
>> input=/apps/hdmi-prod/b_um/epdatasets/exptsession subcommand=ppwmasterprime
>> output=/user/dvasthimal/epdatasets/ppwmasterprime buffersize=128
>> maxbuffersize=1068 maxResultSize=200G
>>
>>
>> I see this in stdout of the task on that executor
>>
>> 15/07/13 19:58:48 WARN hdfs.BlockReaderLocal: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
>> 15/07/13 20:00:08 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (1 time so far)
>> 15/07/13 20:01:31 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (2 times so far)
>> 15/07/13 20:03:07 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (3 times so far)
>> 15/07/13 20:04:32 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (4 times so far)
>> 15/07/13 20:06:21 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (5 times so far)
>> 15/07/13 20:08:09 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (6 times so far)
>> 15/07/13 20:09:51 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (7 times so far)
>>
>>
>>
>> Also attached is the thread dump
>>
>>
>> --
>> Deepak
>>
>>
>
>
> --
> Deepak
>
>
Re: How to speed up Spark process
Posted by Nirmal Fernando <ni...@wso2.com>.
If you press on the +details you could see the code that takes time. Did
you already check it?
On Tue, Jul 14, 2015 at 9:56 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <de...@gmail.com> wrote:
> Job view. Others are fast, but the first one (repartition) is taking 95%
> of job run time.
>
> On Mon, Jul 13, 2015 at 9:23 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <de...@gmail.com>
> wrote:
>
>> It completed in 32 minutes. Attached is screenshots. How do i speed it up
>> ?
>>
>>
>> On Mon, Jul 13, 2015 at 9:19 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <de...@gmail.com>
>> wrote:
>>
>>> Its been 30 minutes and still the partitioner has not completed yet, its
>>> ever.
>>>
>>> Without repartition, i see this error
>>> https://issues.apache.org/jira/browse/SPARK-5928
>>>
>>>
>>> FetchFailed(BlockManagerId(1, imran-2.ent.cloudera.com, 55028), shuffleId=1, mapId=0, reduceId=0, message=
>>> org.apache.spark.shuffle.FetchFailedException: Adjusted frame length exceeds 2147483647: 3021252889 - discarded
>>> at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
>>> at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
>>> at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
>>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>> at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>>>
>>>
>>>
>>>
>>> On Mon, Jul 13, 2015 at 8:34 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <de...@gmail.com>
>>> wrote:
>>>
>>>> I have 100 MB of Avro data. and i do repartition(307) is taking forever.
>>>>
>>>> 2. val x = input.repartition(7907).map( {k1,k2,k3,k4}, {inputRecord} )
>>>> 3. val quantiles = x.map( {k1,k2,k3,k4}, TDigest(inputRecord).asBytes
>>>> ).reduceByKey() [ This was groupBy earlier ]
>>>> 4. x.join(quantiles).coalesce(100).writeInAvro
>>>>
>>>>
>>>> Attached is full Scala code.
>>>>
>>>> I have 340 Yarn node cluster with 14G Ram on each node and have input
>>>> data of just just 100 MB. (Hadoop takes 2.5 hours on 1 TB dataset)
>>>>
>>>>
>>>> ./bin/spark-submit -v --master yarn-cluster --jars
>>>> /apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.4/lib/spark_reporting_dep_only-1.0-SNAPSHOT.jar
>>>> --num-executors 330 --driver-memory 14g --driver-java-options
>>>> "-XX:MaxPermSize=512M -Xmx4096M -Xms4096M -verbose:gc -XX:+PrintGCDetails
>>>> -XX:+PrintGCTimeStamps" --executor-memory 14g --executor-cores 1 --queue
>>>> hdmi-others --class com.ebay.ep.poc.spark.reporting.SparkApp
>>>> /home/dvasthimal/spark1.4/lib/spark_reporting-1.0-SNAPSHOT.jar
>>>> startDate=2015-06-20 endDate=2015-06-21
>>>> input=/apps/hdmi-prod/b_um/epdatasets/exptsession subcommand=ppwmasterprime
>>>> output=/user/dvasthimal/epdatasets/ppwmasterprime buffersize=128
>>>> maxbuffersize=1068 maxResultSize=200G
>>>>
>>>>
>>>> I see this in stdout of the task on that executor
>>>>
>>>> 15/07/13 19:58:48 WARN hdfs.BlockReaderLocal: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
>>>> 15/07/13 20:00:08 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (1 time so far)
>>>> 15/07/13 20:01:31 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (2 times so far)
>>>> 15/07/13 20:03:07 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (3 times so far)
>>>> 15/07/13 20:04:32 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (4 times so far)
>>>> 15/07/13 20:06:21 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (5 times so far)
>>>> 15/07/13 20:08:09 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (6 times so far)
>>>> 15/07/13 20:09:51 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (7 times so far)
>>>>
>>>>
>>>>
>>>> Also attached is the thread dump
>>>>
>>>>
>>>> --
>>>> Deepak
>>>>
>>>>
>>>
>>>
>>> --
>>> Deepak
>>>
>>>
>>
>>
>> --
>> Deepak
>>
>>
>
>
> --
> Deepak
>
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
--
Thanks & regards,
Nirmal
Associate Technical Lead - Data Technologies Team, WSO2 Inc.
Mobile: +94715779733
Blog: http://nirmalfdo.blogspot.com/
Re: How to speed up Spark process
Posted by ๏̯͡๏ <ÐΞ€ρ@Ҝ>,
de...@gmail.com.
Job view. Others are fast, but the first one (repartition) is taking 95% of
job run time.
On Mon, Jul 13, 2015 at 9:23 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <de...@gmail.com> wrote:
> It completed in 32 minutes. Attached is screenshots. How do i speed it up ?
>
>
> On Mon, Jul 13, 2015 at 9:19 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <de...@gmail.com>
> wrote:
>
>> Its been 30 minutes and still the partitioner has not completed yet, its
>> ever.
>>
>> Without repartition, i see this error
>> https://issues.apache.org/jira/browse/SPARK-5928
>>
>>
>> FetchFailed(BlockManagerId(1, imran-2.ent.cloudera.com, 55028), shuffleId=1, mapId=0, reduceId=0, message=
>> org.apache.spark.shuffle.FetchFailedException: Adjusted frame length exceeds 2147483647: 3021252889 - discarded
>> at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
>> at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
>> at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>> at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>>
>>
>>
>>
>> On Mon, Jul 13, 2015 at 8:34 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <de...@gmail.com>
>> wrote:
>>
>>> I have 100 MB of Avro data. and i do repartition(307) is taking forever.
>>>
>>> 2. val x = input.repartition(7907).map( {k1,k2,k3,k4}, {inputRecord} )
>>> 3. val quantiles = x.map( {k1,k2,k3,k4}, TDigest(inputRecord).asBytes
>>> ).reduceByKey() [ This was groupBy earlier ]
>>> 4. x.join(quantiles).coalesce(100).writeInAvro
>>>
>>>
>>> Attached is full Scala code.
>>>
>>> I have 340 Yarn node cluster with 14G Ram on each node and have input
>>> data of just just 100 MB. (Hadoop takes 2.5 hours on 1 TB dataset)
>>>
>>>
>>> ./bin/spark-submit -v --master yarn-cluster --jars
>>> /apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.4/lib/spark_reporting_dep_only-1.0-SNAPSHOT.jar
>>> --num-executors 330 --driver-memory 14g --driver-java-options
>>> "-XX:MaxPermSize=512M -Xmx4096M -Xms4096M -verbose:gc -XX:+PrintGCDetails
>>> -XX:+PrintGCTimeStamps" --executor-memory 14g --executor-cores 1 --queue
>>> hdmi-others --class com.ebay.ep.poc.spark.reporting.SparkApp
>>> /home/dvasthimal/spark1.4/lib/spark_reporting-1.0-SNAPSHOT.jar
>>> startDate=2015-06-20 endDate=2015-06-21
>>> input=/apps/hdmi-prod/b_um/epdatasets/exptsession subcommand=ppwmasterprime
>>> output=/user/dvasthimal/epdatasets/ppwmasterprime buffersize=128
>>> maxbuffersize=1068 maxResultSize=200G
>>>
>>>
>>> I see this in stdout of the task on that executor
>>>
>>> 15/07/13 19:58:48 WARN hdfs.BlockReaderLocal: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
>>> 15/07/13 20:00:08 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (1 time so far)
>>> 15/07/13 20:01:31 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (2 times so far)
>>> 15/07/13 20:03:07 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (3 times so far)
>>> 15/07/13 20:04:32 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (4 times so far)
>>> 15/07/13 20:06:21 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (5 times so far)
>>> 15/07/13 20:08:09 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (6 times so far)
>>> 15/07/13 20:09:51 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (7 times so far)
>>>
>>>
>>>
>>> Also attached is the thread dump
>>>
>>>
>>> --
>>> Deepak
>>>
>>>
>>
>>
>> --
>> Deepak
>>
>>
>
>
> --
> Deepak
>
>
--
Deepak
Re: How to speed up Spark process
Posted by ๏̯͡๏ <ÐΞ€ρ@Ҝ>,
de...@gmail.com.
It completed in 32 minutes. Attached is screenshots. How do i speed it up ?
On Mon, Jul 13, 2015 at 9:19 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <de...@gmail.com> wrote:
> Its been 30 minutes and still the partitioner has not completed yet, its
> ever.
>
> Without repartition, i see this error
> https://issues.apache.org/jira/browse/SPARK-5928
>
>
> FetchFailed(BlockManagerId(1, imran-2.ent.cloudera.com, 55028), shuffleId=1, mapId=0, reduceId=0, message=
> org.apache.spark.shuffle.FetchFailedException: Adjusted frame length exceeds 2147483647: 3021252889 - discarded
> at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
> at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
> at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>
>
>
>
> On Mon, Jul 13, 2015 at 8:34 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <de...@gmail.com>
> wrote:
>
>> I have 100 MB of Avro data. and i do repartition(307) is taking forever.
>>
>> 2. val x = input.repartition(7907).map( {k1,k2,k3,k4}, {inputRecord} )
>> 3. val quantiles = x.map( {k1,k2,k3,k4}, TDigest(inputRecord).asBytes
>> ).reduceByKey() [ This was groupBy earlier ]
>> 4. x.join(quantiles).coalesce(100).writeInAvro
>>
>>
>> Attached is full Scala code.
>>
>> I have 340 Yarn node cluster with 14G Ram on each node and have input
>> data of just just 100 MB. (Hadoop takes 2.5 hours on 1 TB dataset)
>>
>>
>> ./bin/spark-submit -v --master yarn-cluster --jars
>> /apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.4/lib/spark_reporting_dep_only-1.0-SNAPSHOT.jar
>> --num-executors 330 --driver-memory 14g --driver-java-options
>> "-XX:MaxPermSize=512M -Xmx4096M -Xms4096M -verbose:gc -XX:+PrintGCDetails
>> -XX:+PrintGCTimeStamps" --executor-memory 14g --executor-cores 1 --queue
>> hdmi-others --class com.ebay.ep.poc.spark.reporting.SparkApp
>> /home/dvasthimal/spark1.4/lib/spark_reporting-1.0-SNAPSHOT.jar
>> startDate=2015-06-20 endDate=2015-06-21
>> input=/apps/hdmi-prod/b_um/epdatasets/exptsession subcommand=ppwmasterprime
>> output=/user/dvasthimal/epdatasets/ppwmasterprime buffersize=128
>> maxbuffersize=1068 maxResultSize=200G
>>
>>
>> I see this in stdout of the task on that executor
>>
>> 15/07/13 19:58:48 WARN hdfs.BlockReaderLocal: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
>> 15/07/13 20:00:08 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (1 time so far)
>> 15/07/13 20:01:31 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (2 times so far)
>> 15/07/13 20:03:07 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (3 times so far)
>> 15/07/13 20:04:32 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (4 times so far)
>> 15/07/13 20:06:21 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (5 times so far)
>> 15/07/13 20:08:09 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (6 times so far)
>> 15/07/13 20:09:51 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (7 times so far)
>>
>>
>>
>> Also attached is the thread dump
>>
>>
>> --
>> Deepak
>>
>>
>
>
> --
> Deepak
>
>
--
Deepak
Re: How to speed up Spark process
Posted by ๏̯͡๏ <ÐΞ€ρ@Ҝ>,
de...@gmail.com.
Its been 30 minutes and still the partitioner has not completed yet, its
ever.
Without repartition, i see this error
https://issues.apache.org/jira/browse/SPARK-5928
FetchFailed(BlockManagerId(1, imran-2.ent.cloudera.com, 55028),
shuffleId=1, mapId=0, reduceId=0, message=
org.apache.spark.shuffle.FetchFailedException: Adjusted frame length
exceeds 2147483647: 3021252889 - discarded
at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
On Mon, Jul 13, 2015 at 8:34 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <de...@gmail.com> wrote:
> I have 100 MB of Avro data. and i do repartition(307) is taking forever.
>
> 2. val x = input.repartition(7907).map( {k1,k2,k3,k4}, {inputRecord} )
> 3. val quantiles = x.map( {k1,k2,k3,k4}, TDigest(inputRecord).asBytes
> ).reduceByKey() [ This was groupBy earlier ]
> 4. x.join(quantiles).coalesce(100).writeInAvro
>
>
> Attached is full Scala code.
>
> I have 340 Yarn node cluster with 14G Ram on each node and have input data
> of just just 100 MB. (Hadoop takes 2.5 hours on 1 TB dataset)
>
>
> ./bin/spark-submit -v --master yarn-cluster --jars
> /apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.4/lib/spark_reporting_dep_only-1.0-SNAPSHOT.jar
> --num-executors 330 --driver-memory 14g --driver-java-options
> "-XX:MaxPermSize=512M -Xmx4096M -Xms4096M -verbose:gc -XX:+PrintGCDetails
> -XX:+PrintGCTimeStamps" --executor-memory 14g --executor-cores 1 --queue
> hdmi-others --class com.ebay.ep.poc.spark.reporting.SparkApp
> /home/dvasthimal/spark1.4/lib/spark_reporting-1.0-SNAPSHOT.jar
> startDate=2015-06-20 endDate=2015-06-21
> input=/apps/hdmi-prod/b_um/epdatasets/exptsession subcommand=ppwmasterprime
> output=/user/dvasthimal/epdatasets/ppwmasterprime buffersize=128
> maxbuffersize=1068 maxResultSize=200G
>
>
> I see this in stdout of the task on that executor
>
> 15/07/13 19:58:48 WARN hdfs.BlockReaderLocal: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
> 15/07/13 20:00:08 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (1 time so far)
> 15/07/13 20:01:31 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (2 times so far)
> 15/07/13 20:03:07 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (3 times so far)
> 15/07/13 20:04:32 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (4 times so far)
> 15/07/13 20:06:21 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (5 times so far)
> 15/07/13 20:08:09 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (6 times so far)
> 15/07/13 20:09:51 INFO collection.ExternalSorter: Thread 47 spilling in-memory map of 2.2 GB to disk (7 times so far)
>
>
>
> Also attached is the thread dump
>
>
> --
> Deepak
>
>
--
Deepak