You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Steve Loughran <st...@hortonworks.com> on 2017/05/01 13:26:05 UTC

Re: removing columns from file

On 28 Apr 2017, at 16:10, Anubhav Agarwal <an...@gmail.com>> wrote:

Are you using Spark's textFiles method? If so, go through this blog :-
http://tech.kinja.com/how-not-to-pull-from-s3-using-apache-spark-1704509219


old/dated blog post.

If you get the Hadoop 2.8 binaries on your classpath, s3a does a full directory tree listing if you give it a simple path like "s3a://bucket/events". The example in that post was using a complex wildcard which hasn't yet been speeded up as it's pretty hard to do it in a way which works effectively everywhere.

Having all your data in 1 dir works nicely.


Anubhav

On Mon, Apr 24, 2017 at 12:48 PM, Afshin, Bardia <Ba...@capitalone.com>> wrote:
Hi there,

I have a process that downloads thousands of files from s3 bucket, removes a set of columns from it, and upload it to s3.

S3 is currently not  the bottleneck, having a Single Master Node Spark instance is the bottleneck. One approach is to distribute the files on multiple Spark Master Node workers, that will make it faster.

yes, > 1 worker and, if the work can be partitioned


Question:

1.       Is there a way to utilize master / slave node on Spark to distribute this downloading and processing of files – so it can say do 10 files at a time?


yes, they are called RDDs/Dataframes & Datasets


If you are doing all the processing on the spark driver, then you aren't really using spark much, more just processing them in Scala

To get a dataframe

val df = SparkSession.read.format("csv").load("s3a://bucket/data")

You now have a dataset on all files in the directory /data in the bucket, which will be partitioned how spark decides (which depends on: # of workers, compression format used and its splittability). Assuming you can configure the dataframe with the column structure, you can filter aggressively by selecting only those columns you want

val filteredDf = df.select("rental", "start_time")
filteredDf.save(hdfs://final/processed<s3a://final/processed>")

then, once you've got all the data done, copy them up to S3 via distcp

I'd recommend you start doing this with a small number of files locally, getting the code working, then see if you can use it with s3 as the source/dest of data, again, locally if you want (it's just slow), then move to in-EC2 for the bandwidth.

Bandwidth wise, there are some pretty major performance issues with the s3n connector, S3a in Hadoop 2.7+ works, with Hadoop 2.8 having a lot more speedupm, especially when using orc and parquet as a source, where there's a special "random access mode".

futrher reading
https://docs.hortonworks.com/HDPDocuments/HDCloudAWS/HDCloudAWS-1.14.1/bk_hdcloud-aws/content/s3-spark/index.html

https://docs.hortonworks.com/HDPDocuments/HDCloudAWS/HDCloudAWS-1.14.1/bk_hdcloud-aws/content/s3-performance/index.html


2.       Is there a way to scale workers with Spark downloading and processing files, even if they are all Single Master Node?



I think there may be some terminology confusion here. You are going to have to have one process which is the spark driver: either on your client machine, deployed somewhere in the cluster via YARN/Mesos, or running on a static location withing a spark standalone cluster. Everything other than the driver process is a work, which will do the work.