You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Jon Chase <jo...@gmail.com> on 2014/12/18 14:56:04 UTC

Downloads from S3 exceedingly slow when running on spark-ec2

I'm running a very simple Spark application that downloads files from S3,
does a bit of mapping, then uploads new files.  Each file is roughly 2MB
and is gzip'd.  I was running the same code on Amazon's EMR w/Spark and not
having any download speed issues (Amazon's EMR provides a custom
implementation of the s3n:// file system, FWIW).

When I say exceedingly slow, I mean that it takes about 2 minutes to
download and process a 2MB file (this was taking ~2 seconds on the same
instance types in Amazon's EMR).  When I download the same file from the
EC2 machine with wget or curl, it downloads in ~ 1 second.  I've also done
other bandwidth checks for downloads from other external hosts - no speed
problems there.

Tried this w/Spark 1.1.0 and 1.1.1.

When I do a thread dump on a worker, I typically see this a lot:



"Executor task launch worker-7" daemon prio=10 tid=0x00007fd174039000
nid=0x59e9 runnable [0x00007fd1f7dfb000]
   java.lang.Thread.State: RUNNABLE
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.read(SocketInputStream.java:152)
at java.net.SocketInputStream.read(SocketInputStream.java:122)
at sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
at sun.security.ssl.InputRecord.read(InputRecord.java:480)
at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927)
- locked <0x00000007e44dd140> (a java.lang.Object)
at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:884)
at sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
- locked <0x00000007e44e1350> (a sun.security.ssl.AppInputStream)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:235)
at java.io.BufferedInputStream.read(BufferedInputStream.java:254)
- locked <0x00000007e44ea800> (a java.io.BufferedInputStream)
at org.apache.commons.httpclient.HttpParser.readRawLine(HttpParser.java:78)
at org.apache.commons.httpclient.HttpParser.readLine(HttpParser.java:106)
at
org.apache.commons.httpclient.HttpConnection.readLine(HttpConnection.java:1116)
at
org.apache.commons.httpclient.MultiThreadedHttpConnectionManager$HttpConnectionAdapter.readLine(MultiThreadedHttpConnectionManager.java:1413)
at
org.apache.commons.httpclient.HttpMethodBase.readStatusLine(HttpMethodBase.java:1973)
at
org.apache.commons.httpclient.HttpMethodBase.readResponse(HttpMethodBase.java:1735)
at
org.apache.commons.httpclient.HttpMethodBase.execute(HttpMethodBase.java:1098)
at
org.apache.commons.httpclient.HttpMethodDirector.executeWithRetry(HttpMethodDirector.java:398)
at
org.apache.commons.httpclient.HttpMethodDirector.executeMethod(HttpMethodDirector.java:171)
at
org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:397)
at
org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:323)
at
org.jets3t.service.impl.rest.httpclient.RestS3Service.performRequest(RestS3Service.java:342)
at
org.jets3t.service.impl.rest.httpclient.RestS3Service.performRestHead(RestS3Service.java:718)
at
org.jets3t.service.impl.rest.httpclient.RestS3Service.getObjectImpl(RestS3Service.java:1599)
at
org.jets3t.service.impl.rest.httpclient.RestS3Service.getObjectDetailsImpl(RestS3Service.java:1535)
at org.jets3t.service.S3Service.getObjectDetails(S3Service.java:1987)
at org.jets3t.service.S3Service.getObjectDetails(S3Service.java:1332)
at
org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:111)
at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59)
at org.apache.hadoop.fs.s3native.$Proxy6.retrieveMetadata(Unknown Source)
at
org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus(NativeS3FileSystem.java:330)
at
org.apache.hadoop.fs.s3native.NativeS3FileSystem.mkdir(NativeS3FileSystem.java:432)
at
org.apache.hadoop.fs.s3native.NativeS3FileSystem.mkdirs(NativeS3FileSystem.java:425)
at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1126)
at
org.apache.hadoop.mapred.FileOutputCommitter.getWorkPath(FileOutputCommitter.java:256)
at
org.apache.hadoop.mapred.FileOutputFormat.getTaskOutputPath(FileOutputFormat.java:244)
at
org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:126)
at
org.apache.hadoop.mapred.lib.MultipleTextOutputFormat.getBaseRecordWriter(MultipleTextOutputFormat.java:44)
at
org.apache.hadoop.mapred.lib.MultipleOutputFormat$1.write(MultipleOutputFormat.java:99)
at org.apache.spark.SparkHadoopWriter.write(SparkHadoopWriter.scala:94)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:986)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
at org.apache.spark.scheduler.Task.run(Task.scala:54)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)




Here's my pseudo code:

Launch cluster:
./spark-ec2 -k x -i x --wait=480 -m m3.xlarge -t m3.xlarge -s 2
--spot-price=.1 -r eu-west-1 --master-opts=-Dspark.eventLog.enabled=true
launch spark-ec2


Run job (from the master):
~/spark/bin/spark-submit --class com.MyClass --master
spark://ec2-xx-xx-xx-xx:7077 --deploy-mode client --driver-memory 6G
--executor-memory 2G --conf spark.eventLog.enabled=true ~/my.jar --in-path
s3n://bucket/path/prefix-* --out-path s3n://bucket/outpath


Job (Java pseudo):

    rdd=   ctx.textFile(cmd.inPath)
                 .map(return parser.parse(line))
                .filter(return targetDate.equals(logLine.timestamp))
                .keyBy(return p.partitionKeyFor(logLine))


            FileOutputFormat.setCompressOutput(jobConf, true);
            FileOutputFormat.setOutputCompressorClass(jobConf,
GzipCodec.class);


      rdd.saveAsHadoopDataset(jobConf);


Unfortunately, I haven't been able to get debugging turned up - I'm using
slf4j/logback w/the commons-logging and log4j bridges.  Any pointers for
getting that turned up to DEBUG would be helpful too.


I've tried everything I can think of and am at my wit's end - any
troubleshooting suggestions would be greatly appreciated!

Re: Downloads from S3 exceedingly slow when running on spark-ec2

Posted by Nicholas Chammas <ni...@gmail.com>.
Is the operation slow every time or does it run normally if you repeat the
operation within the same app?

Nick


On Thu, Dec 18, 2014 at 8:56 AM, Jon Chase <jo...@gmail.com> wrote:

> I'm running a very simple Spark application that downloads files from S3,
> does a bit of mapping, then uploads new files.  Each file is roughly 2MB
> and is gzip'd.  I was running the same code on Amazon's EMR w/Spark and not
> having any download speed issues (Amazon's EMR provides a custom
> implementation of the s3n:// file system, FWIW).
>
> When I say exceedingly slow, I mean that it takes about 2 minutes to
> download and process a 2MB file (this was taking ~2 seconds on the same
> instance types in Amazon's EMR).  When I download the same file from the
> EC2 machine with wget or curl, it downloads in ~ 1 second.  I've also done
> other bandwidth checks for downloads from other external hosts - no speed
> problems there.
>
> Tried this w/Spark 1.1.0 and 1.1.1.
>
> When I do a thread dump on a worker, I typically see this a lot:
>
>
>
> "Executor task launch worker-7" daemon prio=10 tid=0x00007fd174039000
> nid=0x59e9 runnable [0x00007fd1f7dfb000]
>    java.lang.Thread.State: RUNNABLE
> at java.net.SocketInputStream.socketRead0(Native Method)
> at java.net.SocketInputStream.read(SocketInputStream.java:152)
> at java.net.SocketInputStream.read(SocketInputStream.java:122)
> at sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
> at sun.security.ssl.InputRecord.read(InputRecord.java:480)
> at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927)
> - locked <0x00000007e44dd140> (a java.lang.Object)
> at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:884)
> at sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
> - locked <0x00000007e44e1350> (a sun.security.ssl.AppInputStream)
> at java.io.BufferedInputStream.fill(BufferedInputStream.java:235)
> at java.io.BufferedInputStream.read(BufferedInputStream.java:254)
> - locked <0x00000007e44ea800> (a java.io.BufferedInputStream)
> at org.apache.commons.httpclient.HttpParser.readRawLine(HttpParser.java:78)
> at org.apache.commons.httpclient.HttpParser.readLine(HttpParser.java:106)
> at
> org.apache.commons.httpclient.HttpConnection.readLine(HttpConnection.java:1116)
> at
> org.apache.commons.httpclient.MultiThreadedHttpConnectionManager$HttpConnectionAdapter.readLine(MultiThreadedHttpConnectionManager.java:1413)
> at
> org.apache.commons.httpclient.HttpMethodBase.readStatusLine(HttpMethodBase.java:1973)
> at
> org.apache.commons.httpclient.HttpMethodBase.readResponse(HttpMethodBase.java:1735)
> at
> org.apache.commons.httpclient.HttpMethodBase.execute(HttpMethodBase.java:1098)
> at
> org.apache.commons.httpclient.HttpMethodDirector.executeWithRetry(HttpMethodDirector.java:398)
> at
> org.apache.commons.httpclient.HttpMethodDirector.executeMethod(HttpMethodDirector.java:171)
> at
> org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:397)
> at
> org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:323)
> at
> org.jets3t.service.impl.rest.httpclient.RestS3Service.performRequest(RestS3Service.java:342)
> at
> org.jets3t.service.impl.rest.httpclient.RestS3Service.performRestHead(RestS3Service.java:718)
> at
> org.jets3t.service.impl.rest.httpclient.RestS3Service.getObjectImpl(RestS3Service.java:1599)
> at
> org.jets3t.service.impl.rest.httpclient.RestS3Service.getObjectDetailsImpl(RestS3Service.java:1535)
> at org.jets3t.service.S3Service.getObjectDetails(S3Service.java:1987)
> at org.jets3t.service.S3Service.getObjectDetails(S3Service.java:1332)
> at
> org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:111)
> at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59)
> at org.apache.hadoop.fs.s3native.$Proxy6.retrieveMetadata(Unknown Source)
> at
> org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus(NativeS3FileSystem.java:330)
> at
> org.apache.hadoop.fs.s3native.NativeS3FileSystem.mkdir(NativeS3FileSystem.java:432)
> at
> org.apache.hadoop.fs.s3native.NativeS3FileSystem.mkdirs(NativeS3FileSystem.java:425)
> at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1126)
> at
> org.apache.hadoop.mapred.FileOutputCommitter.getWorkPath(FileOutputCommitter.java:256)
> at
> org.apache.hadoop.mapred.FileOutputFormat.getTaskOutputPath(FileOutputFormat.java:244)
> at
> org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:126)
> at
> org.apache.hadoop.mapred.lib.MultipleTextOutputFormat.getBaseRecordWriter(MultipleTextOutputFormat.java:44)
> at
> org.apache.hadoop.mapred.lib.MultipleOutputFormat$1.write(MultipleOutputFormat.java:99)
> at org.apache.spark.SparkHadoopWriter.write(SparkHadoopWriter.scala:94)
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:986)
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
> at org.apache.spark.scheduler.Task.run(Task.scala:54)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
>
>
>
> Here's my pseudo code:
>
> Launch cluster:
> ./spark-ec2 -k x -i x --wait=480 -m m3.xlarge -t m3.xlarge -s 2
> --spot-price=.1 -r eu-west-1 --master-opts=-Dspark.eventLog.enabled=true
> launch spark-ec2
>
>
> Run job (from the master):
> ~/spark/bin/spark-submit --class com.MyClass --master
> spark://ec2-xx-xx-xx-xx:7077 --deploy-mode client --driver-memory 6G
> --executor-memory 2G --conf spark.eventLog.enabled=true ~/my.jar --in-path
> s3n://bucket/path/prefix-* --out-path s3n://bucket/outpath
>
>
> Job (Java pseudo):
>
>     rdd=   ctx.textFile(cmd.inPath)
>                  .map(return parser.parse(line))
>                 .filter(return targetDate.equals(logLine.timestamp))
>                 .keyBy(return p.partitionKeyFor(logLine))
>
>
>             FileOutputFormat.setCompressOutput(jobConf, true);
>             FileOutputFormat.setOutputCompressorClass(jobConf,
> GzipCodec.class);
>
>
>       rdd.saveAsHadoopDataset(jobConf);
>
>
> Unfortunately, I haven't been able to get debugging turned up - I'm using
> slf4j/logback w/the commons-logging and log4j bridges.  Any pointers for
> getting that turned up to DEBUG would be helpful too.
>
>
> I've tried everything I can think of and am at my wit's end - any
> troubleshooting suggestions would be greatly appreciated!
>
>
>

Re: Downloads from S3 exceedingly slow when running on spark-ec2

Posted by Jon Chase <jo...@gmail.com>.
Turns out I was using the s3:// prefix (in a standalone Spark cluster).  It
was writing a LOT of block_* files to my S3 bucket, which was the cause for
the slowness.  I was coming from Amazon EMR, where Amazon's underlying FS
implementation has re-mapped s3:// to s3n://, which doesn't use the block_*
files.

On Sat, Dec 20, 2014 at 8:17 PM, Paul Brown <pr...@mult.ifario.us> wrote:

>
> I would suggest checking out disk IO on the nodes in your cluster and then
> reading up on the limiting behaviors that accompany different kinds of EC2
> storage.  Depending on how things are configured for your nodes, you may
> have a local storage configuration that provides "bursty" IOPS where you
> get apparently good performance at first and then limiting kicks in and
> slows down the rate at which you can write data to local storage.
>
>
> --
> prb@mult.ifario.us | Multifarious, Inc. | http://mult.ifario.us/
>
> On Thu, Dec 18, 2014 at 5:56 AM, Jon Chase <jo...@gmail.com> wrote:
>
>> I'm running a very simple Spark application that downloads files from S3,
>> does a bit of mapping, then uploads new files.  Each file is roughly 2MB
>> and is gzip'd.  I was running the same code on Amazon's EMR w/Spark and not
>> having any download speed issues (Amazon's EMR provides a custom
>> implementation of the s3n:// file system, FWIW).
>>
>> When I say exceedingly slow, I mean that it takes about 2 minutes to
>> download and process a 2MB file (this was taking ~2 seconds on the same
>> instance types in Amazon's EMR).  When I download the same file from the
>> EC2 machine with wget or curl, it downloads in ~ 1 second.  I've also done
>> other bandwidth checks for downloads from other external hosts - no speed
>> problems there.
>>
>> Tried this w/Spark 1.1.0 and 1.1.1.
>>
>> When I do a thread dump on a worker, I typically see this a lot:
>>
>>
>>
>> "Executor task launch worker-7" daemon prio=10 tid=0x00007fd174039000
>> nid=0x59e9 runnable [0x00007fd1f7dfb000]
>>    java.lang.Thread.State: RUNNABLE
>> at java.net.SocketInputStream.socketRead0(Native Method)
>> at java.net.SocketInputStream.read(SocketInputStream.java:152)
>> at java.net.SocketInputStream.read(SocketInputStream.java:122)
>> at sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
>> at sun.security.ssl.InputRecord.read(InputRecord.java:480)
>> at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927)
>> - locked <0x00000007e44dd140> (a java.lang.Object)
>> at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:884)
>> at sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
>> - locked <0x00000007e44e1350> (a sun.security.ssl.AppInputStream)
>> at java.io.BufferedInputStream.fill(BufferedInputStream.java:235)
>> at java.io.BufferedInputStream.read(BufferedInputStream.java:254)
>> - locked <0x00000007e44ea800> (a java.io.BufferedInputStream)
>> at
>> org.apache.commons.httpclient.HttpParser.readRawLine(HttpParser.java:78)
>> at org.apache.commons.httpclient.HttpParser.readLine(HttpParser.java:106)
>> at
>> org.apache.commons.httpclient.HttpConnection.readLine(HttpConnection.java:1116)
>> at
>> org.apache.commons.httpclient.MultiThreadedHttpConnectionManager$HttpConnectionAdapter.readLine(MultiThreadedHttpConnectionManager.java:1413)
>> at
>> org.apache.commons.httpclient.HttpMethodBase.readStatusLine(HttpMethodBase.java:1973)
>> at
>> org.apache.commons.httpclient.HttpMethodBase.readResponse(HttpMethodBase.java:1735)
>> at
>> org.apache.commons.httpclient.HttpMethodBase.execute(HttpMethodBase.java:1098)
>> at
>> org.apache.commons.httpclient.HttpMethodDirector.executeWithRetry(HttpMethodDirector.java:398)
>> at
>> org.apache.commons.httpclient.HttpMethodDirector.executeMethod(HttpMethodDirector.java:171)
>> at
>> org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:397)
>> at
>> org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:323)
>> at
>> org.jets3t.service.impl.rest.httpclient.RestS3Service.performRequest(RestS3Service.java:342)
>> at
>> org.jets3t.service.impl.rest.httpclient.RestS3Service.performRestHead(RestS3Service.java:718)
>> at
>> org.jets3t.service.impl.rest.httpclient.RestS3Service.getObjectImpl(RestS3Service.java:1599)
>> at
>> org.jets3t.service.impl.rest.httpclient.RestS3Service.getObjectDetailsImpl(RestS3Service.java:1535)
>> at org.jets3t.service.S3Service.getObjectDetails(S3Service.java:1987)
>> at org.jets3t.service.S3Service.getObjectDetails(S3Service.java:1332)
>> at
>> org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:111)
>> at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:606)
>> at
>> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82)
>> at
>> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59)
>> at org.apache.hadoop.fs.s3native.$Proxy6.retrieveMetadata(Unknown Source)
>> at
>> org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus(NativeS3FileSystem.java:330)
>> at
>> org.apache.hadoop.fs.s3native.NativeS3FileSystem.mkdir(NativeS3FileSystem.java:432)
>> at
>> org.apache.hadoop.fs.s3native.NativeS3FileSystem.mkdirs(NativeS3FileSystem.java:425)
>> at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1126)
>> at
>> org.apache.hadoop.mapred.FileOutputCommitter.getWorkPath(FileOutputCommitter.java:256)
>> at
>> org.apache.hadoop.mapred.FileOutputFormat.getTaskOutputPath(FileOutputFormat.java:244)
>> at
>> org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:126)
>> at
>> org.apache.hadoop.mapred.lib.MultipleTextOutputFormat.getBaseRecordWriter(MultipleTextOutputFormat.java:44)
>> at
>> org.apache.hadoop.mapred.lib.MultipleOutputFormat$1.write(MultipleOutputFormat.java:99)
>> at org.apache.spark.SparkHadoopWriter.write(SparkHadoopWriter.scala:94)
>> at
>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:986)
>> at
>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974)
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>> at org.apache.spark.scheduler.Task.run(Task.scala:54)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:745)
>>
>>
>>
>>
>> Here's my pseudo code:
>>
>> Launch cluster:
>> ./spark-ec2 -k x -i x --wait=480 -m m3.xlarge -t m3.xlarge -s 2
>> --spot-price=.1 -r eu-west-1 --master-opts=-Dspark.eventLog.enabled=true
>> launch spark-ec2
>>
>>
>> Run job (from the master):
>> ~/spark/bin/spark-submit --class com.MyClass --master
>> spark://ec2-xx-xx-xx-xx:7077 --deploy-mode client --driver-memory 6G
>> --executor-memory 2G --conf spark.eventLog.enabled=true ~/my.jar --in-path
>> s3n://bucket/path/prefix-* --out-path s3n://bucket/outpath
>>
>>
>> Job (Java pseudo):
>>
>>     rdd=   ctx.textFile(cmd.inPath)
>>                  .map(return parser.parse(line))
>>                 .filter(return targetDate.equals(logLine.timestamp))
>>                 .keyBy(return p.partitionKeyFor(logLine))
>>
>>
>>             FileOutputFormat.setCompressOutput(jobConf, true);
>>             FileOutputFormat.setOutputCompressorClass(jobConf,
>> GzipCodec.class);
>>
>>
>>       rdd.saveAsHadoopDataset(jobConf);
>>
>>
>> Unfortunately, I haven't been able to get debugging turned up - I'm using
>> slf4j/logback w/the commons-logging and log4j bridges.  Any pointers for
>> getting that turned up to DEBUG would be helpful too.
>>
>>
>> I've tried everything I can think of and am at my wit's end - any
>> troubleshooting suggestions would be greatly appreciated!
>>
>>
>>
>

Re: Downloads from S3 exceedingly slow when running on spark-ec2

Posted by Paul Brown <pr...@mult.ifario.us>.
I would suggest checking out disk IO on the nodes in your cluster and then
reading up on the limiting behaviors that accompany different kinds of EC2
storage.  Depending on how things are configured for your nodes, you may
have a local storage configuration that provides "bursty" IOPS where you
get apparently good performance at first and then limiting kicks in and
slows down the rate at which you can write data to local storage.


—
prb@mult.ifario.us | Multifarious, Inc. | http://mult.ifario.us/

On Thu, Dec 18, 2014 at 5:56 AM, Jon Chase <jo...@gmail.com> wrote:

> I'm running a very simple Spark application that downloads files from S3,
> does a bit of mapping, then uploads new files.  Each file is roughly 2MB
> and is gzip'd.  I was running the same code on Amazon's EMR w/Spark and not
> having any download speed issues (Amazon's EMR provides a custom
> implementation of the s3n:// file system, FWIW).
>
> When I say exceedingly slow, I mean that it takes about 2 minutes to
> download and process a 2MB file (this was taking ~2 seconds on the same
> instance types in Amazon's EMR).  When I download the same file from the
> EC2 machine with wget or curl, it downloads in ~ 1 second.  I've also done
> other bandwidth checks for downloads from other external hosts - no speed
> problems there.
>
> Tried this w/Spark 1.1.0 and 1.1.1.
>
> When I do a thread dump on a worker, I typically see this a lot:
>
>
>
> "Executor task launch worker-7" daemon prio=10 tid=0x00007fd174039000
> nid=0x59e9 runnable [0x00007fd1f7dfb000]
>    java.lang.Thread.State: RUNNABLE
> at java.net.SocketInputStream.socketRead0(Native Method)
> at java.net.SocketInputStream.read(SocketInputStream.java:152)
> at java.net.SocketInputStream.read(SocketInputStream.java:122)
> at sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
> at sun.security.ssl.InputRecord.read(InputRecord.java:480)
> at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927)
> - locked <0x00000007e44dd140> (a java.lang.Object)
> at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:884)
> at sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
> - locked <0x00000007e44e1350> (a sun.security.ssl.AppInputStream)
> at java.io.BufferedInputStream.fill(BufferedInputStream.java:235)
> at java.io.BufferedInputStream.read(BufferedInputStream.java:254)
> - locked <0x00000007e44ea800> (a java.io.BufferedInputStream)
> at org.apache.commons.httpclient.HttpParser.readRawLine(HttpParser.java:78)
> at org.apache.commons.httpclient.HttpParser.readLine(HttpParser.java:106)
> at
> org.apache.commons.httpclient.HttpConnection.readLine(HttpConnection.java:1116)
> at
> org.apache.commons.httpclient.MultiThreadedHttpConnectionManager$HttpConnectionAdapter.readLine(MultiThreadedHttpConnectionManager.java:1413)
> at
> org.apache.commons.httpclient.HttpMethodBase.readStatusLine(HttpMethodBase.java:1973)
> at
> org.apache.commons.httpclient.HttpMethodBase.readResponse(HttpMethodBase.java:1735)
> at
> org.apache.commons.httpclient.HttpMethodBase.execute(HttpMethodBase.java:1098)
> at
> org.apache.commons.httpclient.HttpMethodDirector.executeWithRetry(HttpMethodDirector.java:398)
> at
> org.apache.commons.httpclient.HttpMethodDirector.executeMethod(HttpMethodDirector.java:171)
> at
> org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:397)
> at
> org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:323)
> at
> org.jets3t.service.impl.rest.httpclient.RestS3Service.performRequest(RestS3Service.java:342)
> at
> org.jets3t.service.impl.rest.httpclient.RestS3Service.performRestHead(RestS3Service.java:718)
> at
> org.jets3t.service.impl.rest.httpclient.RestS3Service.getObjectImpl(RestS3Service.java:1599)
> at
> org.jets3t.service.impl.rest.httpclient.RestS3Service.getObjectDetailsImpl(RestS3Service.java:1535)
> at org.jets3t.service.S3Service.getObjectDetails(S3Service.java:1987)
> at org.jets3t.service.S3Service.getObjectDetails(S3Service.java:1332)
> at
> org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:111)
> at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59)
> at org.apache.hadoop.fs.s3native.$Proxy6.retrieveMetadata(Unknown Source)
> at
> org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus(NativeS3FileSystem.java:330)
> at
> org.apache.hadoop.fs.s3native.NativeS3FileSystem.mkdir(NativeS3FileSystem.java:432)
> at
> org.apache.hadoop.fs.s3native.NativeS3FileSystem.mkdirs(NativeS3FileSystem.java:425)
> at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1126)
> at
> org.apache.hadoop.mapred.FileOutputCommitter.getWorkPath(FileOutputCommitter.java:256)
> at
> org.apache.hadoop.mapred.FileOutputFormat.getTaskOutputPath(FileOutputFormat.java:244)
> at
> org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:126)
> at
> org.apache.hadoop.mapred.lib.MultipleTextOutputFormat.getBaseRecordWriter(MultipleTextOutputFormat.java:44)
> at
> org.apache.hadoop.mapred.lib.MultipleOutputFormat$1.write(MultipleOutputFormat.java:99)
> at org.apache.spark.SparkHadoopWriter.write(SparkHadoopWriter.scala:94)
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:986)
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
> at org.apache.spark.scheduler.Task.run(Task.scala:54)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
>
>
>
> Here's my pseudo code:
>
> Launch cluster:
> ./spark-ec2 -k x -i x --wait=480 -m m3.xlarge -t m3.xlarge -s 2
> --spot-price=.1 -r eu-west-1 --master-opts=-Dspark.eventLog.enabled=true
> launch spark-ec2
>
>
> Run job (from the master):
> ~/spark/bin/spark-submit --class com.MyClass --master
> spark://ec2-xx-xx-xx-xx:7077 --deploy-mode client --driver-memory 6G
> --executor-memory 2G --conf spark.eventLog.enabled=true ~/my.jar --in-path
> s3n://bucket/path/prefix-* --out-path s3n://bucket/outpath
>
>
> Job (Java pseudo):
>
>     rdd=   ctx.textFile(cmd.inPath)
>                  .map(return parser.parse(line))
>                 .filter(return targetDate.equals(logLine.timestamp))
>                 .keyBy(return p.partitionKeyFor(logLine))
>
>
>             FileOutputFormat.setCompressOutput(jobConf, true);
>             FileOutputFormat.setOutputCompressorClass(jobConf,
> GzipCodec.class);
>
>
>       rdd.saveAsHadoopDataset(jobConf);
>
>
> Unfortunately, I haven't been able to get debugging turned up - I'm using
> slf4j/logback w/the commons-logging and log4j bridges.  Any pointers for
> getting that turned up to DEBUG would be helpful too.
>
>
> I've tried everything I can think of and am at my wit's end - any
> troubleshooting suggestions would be greatly appreciated!
>
>
>