You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Dark Crusader <re...@gmail.com> on 2020/05/27 16:16:28 UTC

Spark dataframe hdfs vs s3

Hi all,

I am reading data from hdfs in the form of parquet files (around 3 GB) and
running an algorithm from the spark ml library.

If I create the same spark dataframe by reading data from S3, the same
algorithm takes considerably more time.

I don't understand why this is happening. Is this a chance occurence or are
the spark dataframes created different?

I don't understand how the data store would effect the algorithm
performance.

Any help would be appreciated. Thanks a lot.

Re: Spark dataframe hdfs vs s3

Posted by Anwar AliKhan <an...@gmail.com>.
Optimisation of Spark applications

Apache Spark <https://www.adaltas.com/en/tag/apache-spark/> is an in-memory
data processing tool widely used in companies to deal with Big Data issues.
Running a Spark application in production requires user-defined resources.
This article presents several Spark concepts to optimize the use of the
engine, both in the writing of the code and in the selection of execution
parameters. These concepts will be illustrated through a use case with a
focus on best practices for allocating ressources of a Spark applications
in a Hadoop Yarn <https://www.adaltas.com/en/tag/apache-yarn/> environment.
<https://www.adaltas.com/en/2020/03/30/compute-resources-allocation-spark-yarn/#spark-cluster-terminologies-and-modes>Spark
Cluster: terminologies and modes

Deploying a Spark application in a YARN cluster requires an understanding
of the “master-slave” model as well as the operation of several components:
the Cluster Manager, the Spark Driver, the Spark Executors and the Edge
Node concept.

The “master-slave” model defines two types of entities: the master controls
and centralizes the communications of the slaves. It is a model that is
often applied in the implementation of clusters and/or for parallel
processing. It is also the model used by Spark applications.

The *Cluster Manager* maintains the physical machines on which the Driver
and its Executors are going to run and allocates the requested resources to
the users. Spark supports 4 Cluster Managers: Apache YARN, Mesos,
Standalone and, recently, Kubernetes. We will focus on YARN.

The *Spark Driver* is the entity that manages the execution of the Spark
application (the master), each application is associated with a Driver. Its
role is to interpret the application’s code to transform it into a sequence
of tasks and to maintain all the states and tasks of the Executors.

The *Spark Executors* are the entities responsible for performing the tasks
assigned to them by the Driver (the slaves). They will read these tasks,
execute them and return their states (Success/Fail) and results. The
Executors are linked to only one application at a time.

The *Edge Node* is a physical/virtual machine where users will connect to
instantiate their Spark applications. It serves as an interface between the
cluster and the outside world. It is a comfort zone where components are
pre-installed and most importantly, pre-configured.
<https://www.adaltas.com/en/2020/03/30/compute-resources-allocation-spark-yarn/#execution-modes>Execution
modes

There are different ways to deploy a Spark application:

   - The *Cluster* mode: This is the most common, the user sends a JAR file
   or a Python script to the Cluster Manager. The latter will instantiate a
   Driver and Executors on the different nodes of the cluster. The CM is
   responsible for all processes related to the Spark application. We will use
   it to handle our example: it facilitates the allocation of resources and
   releases them as soon as the application is finished.
   - The *Client* mode: Almost identical to *cluster* mode with the
   difference that the driver is instantiated on the machine where the job is
   submitted, i.e. outside the cluster. It is often used for program
   development because the logs are directly displayed in the current
   terminal, and the instance of the driver is linked to the user’s session.
   This mode is not recommended in production because the Edge Node can
   quickly reach saturation in terms of resources and the Edge Node is a SPOF
   (Single Point Of Failure).
   - The *Local* mode: the Driver and Executors run on the machine on which
   the user is logged in. It is only recommended for the purpose of testing an
   application in a local environment or for executing unit tests.

The number of Executors and their respective resources are provided
directly in the spark-submit command, or via the configuration properties
injected at the creation of the SparkSession object. Once the Executors are
created, they will communicate with the Driver, which will distribute the
processing tasks.
<https://www.adaltas.com/en/2020/03/30/compute-resources-allocation-spark-yarn/#resources>
Resources

A Spark application works as follows: data is stored in memory, and the
CPUs are responsible for performing the tasks of an application. The
application is therefore constrained by the resources used, including
memory and CPUs, which are defined for the Driver and Executors.

Spark applications can generally be divided into two types:

   - *Memory-intensive*: Applications involving massive joins or HashMap
   processing. These operations are expensive in terms of memory.
   - *CPU-intensive*: All applications involving sorting operations or
   searching for particular data. These types of jobs become intensive
   depending on the frequency of these operations.

Some applications are both memory intensive and CPU intensive: some models
of Machine Learning, for example, require a large number of computationally
intensive operation loops and store the intermediate results in memory.

The operation of the Executor memory
<https://www.tutorialdocs.com/article/spark-memory-management.html> has two
main parts concerning storage and execution. Thanks to the *Unified Memory
Manager* mechanism, memory-storage and run-time memory share the same
space, allowing one to occupy the unused resources of the other.

   - The first is for storing data in the cache when using, for example,
   .cache() or broadcast().
   - The other part (execution) is used to store the temporary results of
   *shuffle*, *join*, aggregation, etc. processes.

Memory allocation to Executors is closely related to CPU allocation: one
core performs a task on one partition, so if an Executor has 4 cores, it
must have the capacity to store all 4 partitions as well as intermediate
results, metadata, etc… Thus, the user has to fix the amount of memory and
cores allocated to each Executor according to the application he wants to
process and the source file: a file is partitioned by default according to
the total number of cores in the cluster.

This link
<https://aws.amazon.com/blogs/big-data/best-practices-for-successfully-managing-memory-for-apache-spark-applications-on-amazon-emr/>
lists
various best practices for cluster use and configuration. The following
diagram, taken from the previous link, gives an overview of how the memory
of an Executor works. An important note is that the memory allocated to an
Executor will always be higher than the specified value due to the
memoryOverhead which defaults to 10% of the specified value.

[image: Spark memory]
<https://www.adaltas.com/static/db3f761e83688dadf5b5f3ccad2bbd3f/d48f1/sparkExecMemory.png>
<https://www.adaltas.com/en/2020/03/30/compute-resources-allocation-spark-yarn/#how-a-spark-application-works>How
a Spark Application Works

In a multi-user cluster, the resources available to each user are not
unlimited. They are constrained to a given amount of memory, CPU and
storage space, in order to avoid monopolization of resources by a limited
number of users. These allocation rules are defined and managed by the
cluster administrator in charge of its operation.

In the case of Apache YARN, these resources can be allocated by *file*.
This means that a user may only be allowed to submit applications in a
single YARN queue in which the amount of resources available is constrained
by a maximum memory and CPU size.

The components and their resources used by a Spark application are
configurable via:

   - the spark-submit command using the arguments --executor-memory,
   --executor-cores, --num-executors, --driver-cores and --driver-memory
    arguments.
   - the SparkSession object by configuring, for example,
.config("spark.executor.instances",
   "7") (see the scripts in the GitHub project
   <https://github.com/adaltas/spark-examples-resources/>).
   - the options in the spark-defaults.conf configuration file.

The user can also let the Spark decide how many Executors are needed
depending on the processing to be done via the following parameters:

spark = SparkSession.builder \
    .appName("<XxXxX>") \
    .config("spark.dynamicAllocation.enabled", "true") \
    .config("spark.executor.cores", "2") \
    .config("spark.dynamicAllocation.minExecutors", "1") \
    .config("spark.dynamicAllocation.maxExecutors", "5") \
    .getOrCreate()

Thus, the application does not monopolize more resources than necessary in
a multi-user environment. More details are described in this article
explaining how Facebook adjusts Apache Spark for large-scale workloads
<https://towardsdatascience.com/how-does-facebook-tune-apache-spark-for-large-scale-workloads-3238ddda0830>
.

Regarding the underlying filesystem where data is stored, two optimization
rules
<https://spoddutur.github.io/spark-notes/distribution_of_executors_cores_and_memory_for_spark_application.html>
are
important:

   - Partition size should be at least 128MB and, if possible, based on a
   key attribute.
   - The number of CPUs/Executor should be between 4 and 6.

In the Spark application presented below, we will use the 2018 New York
Green Taxi dataset. The following script will download the file and save it
in HDFS:

# Download the datasetcurl
https://data.cityofnewyork.us/api/views/w7fs-fd9i/rows.csv?accessType=DOWNLOAD
\
  -o ~/trip_taxi_green2018.csv# Create a HDFS directory
hdfs dfs -mkdir ~/nyctrip# Load the dataset into HDFS
hdfs dfs -put \
  ~/trip_taxi_green2018.csv \
  ~/nyctrip/trip_taxi_green2018.csv \
  -D dfs.block.size=128M# Remove the original datasetrm
~/trip_taxi_green2018.csv

Our file of 793MB divided into 128MB blocks gives us 793/128 = 6.19 or 7
partitions.

If we ask for 7 Executors, they will have respectively in memory ~113MB.
With 4 Executors with 2 CPUs, this time they will have ~200MB of data. We
have to think about the memory allocation according to the processed
application: if the dataset is transformed several times, a good starting
point is to allocate twice as many GB of RAM as there are cores per
Executors.
<https://www.adaltas.com/en/2020/03/30/compute-resources-allocation-spark-yarn/#developing-and-processing-a-spark-application>Developing
and processing a Spark application

We will discuss several aspects of optimization by writing a simple Spark
application in Python. This will be deployed in Spark via YARN using the
cluster deployment mode. As stated above, we will use the data from the
2018 New York green taxis. The problem will be to determine which of the
two taxi companies referenced in the dataset was the most efficient in
terms of trip numbers processed in 2018.

In this part, we detail each part of the code. The complete scripts are
available on GitHub
<https://github.com/adaltas/spark-examples-resources/scripts>.
<https://www.adaltas.com/en/2020/03/30/compute-resources-allocation-spark-yarn/#importing-and-declaring-a-sparksession>Importing
and declaring a SparkSession

For the sake of clarity, imports of Spark classes and functions are
generally reported first. In one Spark application, we explained that the
user must declare a SparkSession' followed by thebuilder’ class in order to
name and configure the processing options of the application via appName()
 and config("key", "value"). The .getOrCreate() argument checks whether a
SparkSession already exists or creates a new one. If a SparkSession already
exists and a new one is created, the options for the new one will also be
added to the previous one.

from pyspark.sql import SparkSessionfrom pyspark.sql.functions import
desc, broadcast
spark = SparkSession.builder \
    .appName ("Best Driver 2018") \
    .config("spark.sql.shuffle.partitions", "7") \
    .config("spark.executor.memory", "2g") \
    .config("spark.executor.instances", "4") \
    .config("spark.executor.cores", "2") \
    .getOrCreate()

<https://www.adaltas.com/en/2020/03/30/compute-resources-allocation-spark-yarn/#importing-the-dataset>Importing
the dataset

When a Spark application is processed in *Batch processing*, the user has
the choice of declaring the schema of the dataset or letting Spark infer
this schema. This is not the case for streaming applications where the user
has to declare the schema. Declaring the data schema can be done via a DDL
format (which is used here) or via the use of the StructType and StructField
 types.

schema = "VendorID INT,pickup_datetime TIMESTAMP,dropoff_datetime
TIMESTAMP,store_and_fwd_flag STRING,RatecodeID INT,PULocationID
INT,DOLocationID INT,passenger_count INT,trip_distance
FLOAT,fare_amount FLOAT,extra FLOAT,mta_tax FLOAT,tip_amount
FLOAT,tolls_amount FLOAT,ehail_fee FLOAT,improvement_surcharge
FLOAT,total_amount FLOAT,payment_type INT,trip_type
INT,congestion_surcharge FLOAT"

driver_df = spark.read \
.csv(path="/home/ferdinand.de-baecque-dsti/nyctrip/trip_taxi.csv",
schema=schema,header=True) \#
.csv(path="/home/ferdinand.de-baecque-dsti/nyctrip/trip_taxi.csv",
inferSchema=True,header=True) \
.select("VendorID") \
.repartition(7, "VendorID")

Here we use the .csv() function which is part of the DataFrameReader class
and allows to configure several parameters to create a DataFrame from the
dataset. Spark partitions the data according to the total number of cores
in the cluster. We end up “repartitioning” the DataFrame according to the
*VendorID* attribute using the .repartition(<#_partitions>, <"column_name">)
 function.

*Reading without schema*: with the option inferSchema=True.

[image: Execution time without schema]
<https://www.adaltas.com/static/f2979cd0e3dfe83c978b71b082617981/8bf8a/withoutSchema.png>

*Reading with schema*: with the schema=schema option.

[image: Execution time without schema]
<https://www.adaltas.com/static/ef59fe7d844ce6357783754f220ef3cd/e216b/withSchema.png>

Reading the dataset is faster when the user declares his schema.

Finally, if we remove the .select("VendorID") part from driver_df, we have:

[image: Execution time without select()]
<https://www.adaltas.com/static/fa9797b62a3ccafbbb6329b55eccdaf5/764d0/shuf7withoutSelect.png>
<https://www.adaltas.com/en/2020/03/30/compute-resources-allocation-spark-yarn/#request-definition>Request
definition

The query of our problem is in one line thanks to the native Spark
functions called *higher-ordered functions*. They are optimized by the
engine and are to be preferred to *User-Defined Functions* (UDF) written by
a user. Using a *higher-order* function allows Spark to understand what the
user is trying to achieve and optimize the processing of the application.
On the other hand, Spark does not see the content of a UDF (opaque) and
therefore, does not understand the purpose of the UDF and is not able to
optimize its processing. New native functions are added as Spark evolves.

Native functions work as follows: the engine builds several “logical”
planes and then implements and compares the way these planes are processed
to select the best one - it is this plane, called *Physical Plan*, which is
printed with the .explain() function. These optimization aspects are
related to the Tungsten Project
<https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html>
and
the Catalyst Optimizer <https://databricks.com/glossary/catalyst-optimizer>.

count_trip = driver_df.groupBy("VendorID").count().sort(desc("count"))
count_trip.explain()
count_trip.show()

In our query, we start by grouping the companies together with groupBy().
Then we count the number of occurrences with count(). Finally, we sort the
results with sort(desc()) so that the best performing company appears at
the top. This last function is part of the family of *wide
transformations* creating
a shuffle between Executors passing through the network; the graph below
illustrates this concept. By default, Spark creates 200 partitions when
these *wide transformations* are called; a good practice is to redefine
this value according to the scenario.

[image: Wide Transformation]
<https://www.adaltas.com/static/30fb19dbd1b025323e082978ddcc5836/e0cdb/wideTransform.png>

When declaring the SparkSession class, we set
.config("spark.sql.shuffle.partitions",
"7"). This option is related to *wide transformations*, it limits the
creation of partitions after a shuffle: the sort(desc()) operation creates
7 partitions instead of 200.

Without this configuration option, we have:

[image: Execution time whithout shuffling]
<https://www.adaltas.com/static/2ac1e4d41d5bee5b307d5545c7d91796/10b63/defShuffle.png>

With the option, we have:

[image: Execution time with shuffling]
<https://www.adaltas.com/static/485fba94816edb52a88e135c8518254d/09262/shuf7withSelect.png>

Changing the default value of this configuration parameter can greatly
optimize these types of transformations. Here, we go from 0.6 to 0.2
seconds, limiting the number of partitions created by the shuffles to 7.
The value has been set according to the number of cores in the cluster.

The physical plane given by explain() is:

[image: Physical Plan]
<https://www.adaltas.com/static/b76d2bf23667f3dc0ab26820bfc291a4/bcec6/physicalPlan.png>

Finally, .show() is an *Action* in Spark which returns the result in the
logs or in the console depending on Spark’s configuration. It is this
function that will trigger the execution of the transformations that
precede it in the application. This is the principle of Lazy Evaluation
<https://www.quora.com/What-is-lazy-evaluation-in-Apache-Spark>.

The result of our query is:

[image: Result Query Count]
<https://www.adaltas.com/static/12b99c0c330c7a54945d71907d91aff0/8604e/resultQueryCount.png>
<https://www.adaltas.com/en/2020/03/30/compute-resources-allocation-spark-yarn/#processing-the-request>Processing
the request

The previous captures were made with 7 Executives of 1 CPU and 2GB of RAM.
The command to run the application in Spark is:

spark-submit --master yarn --deploy-mode cluster \
  --queue adaltas ./scripts_countTrip/query_7Exécuteurs.py

In the SparkSession.builder, if we set to 4 Executives each having 2 CPUs
and 4g of RAM while modifying spark.shuffle.partitions with *8*, we get a
gain of 1 second on the scan of our file:

spark-submit --master yarn --deploy-mode cluster \
  --queue adaltas scripts/scripts_countTrip/query_4executors.py \
  <Path_to_your_HDFS_Local_file_>

[image: Execution with 4 Execs]
<https://www.adaltas.com/static/a39297dd1cba28c7c84c1abd8094acfa/10b63/Execs4cpu2.png>

The documentation
<https://spark.apache.org/docs/latest/tuning.html#level-of-parallelism> advises
to have 2-3 times more partitions than cores available in the cluster, the
data and thus, the tasks will be better distributed. Cores do not
necessarily process a job at the same speed. For example, we can
repartition our file into two partitions according to the *VendorID* column
(because it has two occurrences) by adding the .repartition(2,
"VendorID") function
at the end of driver_df. With 2 Executors with 1 CPU, one CPU will process
its .count() much slower than the other.
<https://www.adaltas.com/en/2020/03/30/compute-resources-allocation-spark-yarn/#joins-in-spark>Joins
in Spark

Joining operations are recurrent in Spark applications, they must be
optimized to avoid unnecessary shuffles. We will create a small dataset
directly in the application and the goal will be to join this *small* DataFrame
with our *large* taxi database. In this case, a good practice is to use the
broadcast() function to duplicate the contents of the small DataFrame on
all Executors. Shuffles will not be necessary because each Executor will
have its own partition of the large DataFrame and also the entire small
DataFrame.

*Join 1*: Default join.

littleDf = spark.createDataFrame( 1, "The First Company"), (2, "The
second company"), ("VendorID", "VendorName")# Join 1
detailledDf = driver_df.join(littleDf, driver_df["VendorID"] ==
littleDf["VendorID"])
detailedDf.explain()
detailedDf.show()

spark-submit --master yarn --deploy-mode cluster --queue adaltas --
  scripts/scripts_join/default_join.py \
  <Path_to_your_HDFS_Local_file_>


   - The *Physical Plan* is:

[image: Physical join plan 1]
<https://www.adaltas.com/static/476132f7e0e58a10d8b4879357e29d86/00b3d/physicalPlanDefaultJoin.png>

   - The execution time of the application is:

[image: Execution time join 2]
<https://www.adaltas.com/static/7401fa21e10a6fc18fff92437705e1ef/7e318/runtimeExecutionDefaultJoin.png>

*Join 2*: Using the *BroadcastJoin*.

detailledDfBroad = driver_df.join(broadcast(littleDf),
driver_df["VendorID"] == littleDf["VendorID"])
detailedDfBroad.explain()
detailedDfBroad.show()

spark-submit --master yarn --deploy-mode cluster --queue adaltas --
  scripts/scripts_join/broadcast_join.py \
  <Path_to_your_HDFS_Local_file_>


   - The *Physical Plan* gives:

[image: Physical join plan 2]
<https://www.adaltas.com/static/ad2bf7cac759350b958dd98d9d952717/b3154/physicalPlanBroadJoin.png>

   - The execution time:

[image: Execution time join 2]
<https://www.adaltas.com/static/3ed4ab91b17feed1359ce5c7b6ffef60/764d0/runtimeBroadJoin.png>

The broadcast() technique takes us from ~13 seconds to ~10 seconds in
total. This is effective if the entire small dataset can be stored in the
cache of each Executors.

Finally, doing what the Spark documentation recommends and knowing the
number of blocks in HDFS - that is, spreading the data over 16 (8CPUx2)
partitions and setting the shuffles to 16 partitions - reduces the total
*TaskTime* of the *join 2* by 0.1 seconds:

spark-submit --master yarn --deploy-mode cluster --queue adaltas
scripts_join/final_countJoin.py

[image: Task Time 16 partitions]
<https://www.adaltas.com/static/38e5b233aaa3d9019eb5fcdc427668bb/c2b0b/tasTime16Part.png>
<https://www.adaltas.com/en/2020/03/30/compute-resources-allocation-spark-yarn/#monitoring>
Monitoring

All screenshots for the application come from the YARN ResourceManager web
interface. This is where all the information from the Spark applications
can be found. Thanks to the Spark History Server which is activated in the
cluster, the metrics of the Spark applications are accessible after their
processing.

[image: Web UI RM YARN]
<https://www.adaltas.com/static/0019c24e723886c1cd305ebf431dae3f/df88b/yarnRMwebUI.png>

By clicking on the application ID: you can either look at its *Logs* -
where the query results appear in my case - or click on *ApplicationMaster
/ History* to get details about its tasks and Executors.

[image: History and logs]
<https://www.adaltas.com/static/a83fa4ad40b698f4483568b1040497f5/7792f/historyApp.png>

After clicking on *History* and going to the *Stages* tab, you can see the
DAG (Direct Acyclic Graph) of each *stages* which is equivalent to a part
of the content returned by explain() in graphical form:

[image: DAG]
<https://www.adaltas.com/static/5ed2a03b0577e4ac0a047d1e39918647/d024a/DAG.png>

If a Spark user wants to standardize the *Task Time* of each Executor as
proof of the correct distribution of data, this information can be found in
the *Executors* tab:

[image: Execution time with 4 Execessors 2GB]
<https://www.adaltas.com/static/ad3671688d34120a2192ed1e509f0d4d/3b477/ExecWith2RAM.png>

If this is not the case, there is often a problem with the distribution of
the dataset. One solution is to increase the number of partitions using the
.repartition() function, or partition the dataset before consuming it in
Spark. If your application is taking longer than you thought, you can track
the progress of each Executor’s tasks by going to the *Executors* tab and
clicking on the different *Thread Dumps*.

Here, reducing the RAM of the Executors to 1GB instead of 2 has no impact:
no transformation is done on all the data. This choice is specific to the
scenario because each Executor has 2 partitions with a total of ~200MB.
This excess memory can be used by another application.

[image: Execution time with 4 1GB Executors]
<https://www.adaltas.com/static/219634c0e74500cb195d8bfd826f5f23/3955b/ExecWith1RAM.png>
<https://www.adaltas.com/en/2020/03/30/compute-resources-allocation-spark-yarn/#conclusion>
Conclusion

A number of applications run concurrently in a production environment: the
proper allocation of resources to Spark components tends to maximize the
performance of an application as well as the number of applications hosted.

We have presented, through a use case, the relationships between cores &
partitions, memory & transformations as well as the operation of native
functions. Our application was written in Python although this language
adds a step to the processing, called *SerDe*, which this article
<https://medium.com/wbaa/using-scala-udfs-in-pyspark-b70033dd69b9> explains
very well. This is the main reason why applications written in Scala
perform better. Although it is not our case here, a comparison is made in
GitHub
<https://github.com/adaltas/spark-examples-resources/tree/master/scala-maven-jar>.
As far as data consumption is concerned, Spark advises to use the
*Parquet* format:
it keeps the data schema, the data is compressed and saved by columns
(attributes) which makes it easier to extract.

On Sat, 30 May 2020, 08:29 Dark Crusader, <re...@gmail.com>
wrote:

> Thanks all for the replies.
> I am switching to hdfs since it seems like an easier solution.
> To answer some of your questions, my hdfs space is a part of my nodes I
> use for computation on spark.
> From what I understand, this helps because of the data locality advantage.
> Which means that there is less network IO and data redistribution on the
> nodes.
>
> Thanks for your help.
> Aditya
>
> On Sat, 30 May, 2020, 10:48 am Jörn Franke, <jo...@gmail.com> wrote:
>
>> Maybe some aws network optimized instances with higher bandwidth will
>> improve the situation.
>>
>> Am 27.05.2020 um 19:51 schrieb Dark Crusader <
>> relinquisheddragon@gmail.com>:
>>
>> 
>> Hi Jörn,
>>
>> Thanks for the reply. I will try to create a easier example to reproduce
>> the issue.
>>
>> I will also try your suggestion to look into the UI. Can you guide on
>> what I should be looking for?
>>
>> I was already using the s3a protocol to compare the times.
>>
>> My hunch is that multiple reads from S3 are required because of improper
>> caching of intermediate data. And maybe hdfs is doing a better job at this.
>> Does this make sense?
>>
>> I would also like to add that we built an extra layer on S3 which might
>> be adding to even slower times.
>>
>> Thanks for your help.
>>
>> On Wed, 27 May, 2020, 11:03 pm Jörn Franke, <jo...@gmail.com> wrote:
>>
>>> Have you looked in Spark UI why this is the case ?
>>> S3 Reading can take more time - it depends also what s3 url you are
>>> using : s3a vs s3n vs S3.
>>>
>>> It could help after some calculation to persist in-memory or on HDFS.
>>> You can also initially load from S3 and store on HDFS and work from there .
>>>
>>> HDFS offers Data locality for the tasks, ie the tasks start on the nodes
>>> where the data is. Depending on what s3 „protocol“ you are using you might
>>> be also more punished with performance.
>>>
>>> Try s3a as a protocol (replace all s3n with s3a).
>>>
>>> You can also use s3 url but this requires a special bucket
>>> configuration, a dedicated empty bucket and it lacks some ineroperability
>>> with other AWS services.
>>>
>>> Nevertheless, it could be also something else with the code. Can you
>>> post an example reproducing the issue?
>>>
>>> > Am 27.05.2020 um 18:18 schrieb Dark Crusader <
>>> relinquisheddragon@gmail.com>:
>>> >
>>> > 
>>> > Hi all,
>>> >
>>> > I am reading data from hdfs in the form of parquet files (around 3 GB)
>>> and running an algorithm from the spark ml library.
>>> >
>>> > If I create the same spark dataframe by reading data from S3, the same
>>> algorithm takes considerably more time.
>>> >
>>> > I don't understand why this is happening. Is this a chance occurence
>>> or are the spark dataframes created different?
>>> >
>>> > I don't understand how the data store would effect the algorithm
>>> performance.
>>> >
>>> > Any help would be appreciated. Thanks a lot.
>>>
>>

Re: Spark dataframe hdfs vs s3

Posted by Dark Crusader <re...@gmail.com>.
Thanks all for the replies.
I am switching to hdfs since it seems like an easier solution.
To answer some of your questions, my hdfs space is a part of my nodes I use
for computation on spark.
From what I understand, this helps because of the data locality advantage.
Which means that there is less network IO and data redistribution on the
nodes.

Thanks for your help.
Aditya

On Sat, 30 May, 2020, 10:48 am Jörn Franke, <jo...@gmail.com> wrote:

> Maybe some aws network optimized instances with higher bandwidth will
> improve the situation.
>
> Am 27.05.2020 um 19:51 schrieb Dark Crusader <relinquisheddragon@gmail.com
> >:
>
> 
> Hi Jörn,
>
> Thanks for the reply. I will try to create a easier example to reproduce
> the issue.
>
> I will also try your suggestion to look into the UI. Can you guide on what
> I should be looking for?
>
> I was already using the s3a protocol to compare the times.
>
> My hunch is that multiple reads from S3 are required because of improper
> caching of intermediate data. And maybe hdfs is doing a better job at this.
> Does this make sense?
>
> I would also like to add that we built an extra layer on S3 which might be
> adding to even slower times.
>
> Thanks for your help.
>
> On Wed, 27 May, 2020, 11:03 pm Jörn Franke, <jo...@gmail.com> wrote:
>
>> Have you looked in Spark UI why this is the case ?
>> S3 Reading can take more time - it depends also what s3 url you are using
>> : s3a vs s3n vs S3.
>>
>> It could help after some calculation to persist in-memory or on HDFS. You
>> can also initially load from S3 and store on HDFS and work from there .
>>
>> HDFS offers Data locality for the tasks, ie the tasks start on the nodes
>> where the data is. Depending on what s3 „protocol“ you are using you might
>> be also more punished with performance.
>>
>> Try s3a as a protocol (replace all s3n with s3a).
>>
>> You can also use s3 url but this requires a special bucket configuration,
>> a dedicated empty bucket and it lacks some ineroperability with other AWS
>> services.
>>
>> Nevertheless, it could be also something else with the code. Can you post
>> an example reproducing the issue?
>>
>> > Am 27.05.2020 um 18:18 schrieb Dark Crusader <
>> relinquisheddragon@gmail.com>:
>> >
>> > 
>> > Hi all,
>> >
>> > I am reading data from hdfs in the form of parquet files (around 3 GB)
>> and running an algorithm from the spark ml library.
>> >
>> > If I create the same spark dataframe by reading data from S3, the same
>> algorithm takes considerably more time.
>> >
>> > I don't understand why this is happening. Is this a chance occurence or
>> are the spark dataframes created different?
>> >
>> > I don't understand how the data store would effect the algorithm
>> performance.
>> >
>> > Any help would be appreciated. Thanks a lot.
>>
>

Re: Spark dataframe hdfs vs s3

Posted by Jörn Franke <jo...@gmail.com>.
Maybe some aws network optimized instances with higher bandwidth will improve the situation.

> Am 27.05.2020 um 19:51 schrieb Dark Crusader <re...@gmail.com>:
> 
> 
> Hi Jörn,
> 
> Thanks for the reply. I will try to create a easier example to reproduce the issue.
> 
> I will also try your suggestion to look into the UI. Can you guide on what I should be looking for? 
> 
> I was already using the s3a protocol to compare the times.
> 
> My hunch is that multiple reads from S3 are required because of improper caching of intermediate data. And maybe hdfs is doing a better job at this. Does this make sense?
> 
> I would also like to add that we built an extra layer on S3 which might be adding to even slower times.
> 
> Thanks for your help.
> 
>> On Wed, 27 May, 2020, 11:03 pm Jörn Franke, <jo...@gmail.com> wrote:
>> Have you looked in Spark UI why this is the case ? 
>> S3 Reading can take more time - it depends also what s3 url you are using : s3a vs s3n vs S3.
>> 
>> It could help after some calculation to persist in-memory or on HDFS. You can also initially load from S3 and store on HDFS and work from there . 
>> 
>> HDFS offers Data locality for the tasks, ie the tasks start on the nodes where the data is. Depending on what s3 „protocol“ you are using you might be also more punished with performance.
>> 
>> Try s3a as a protocol (replace all s3n with s3a).
>> 
>> You can also use s3 url but this requires a special bucket configuration, a dedicated empty bucket and it lacks some ineroperability with other AWS services.
>> 
>> Nevertheless, it could be also something else with the code. Can you post an example reproducing the issue?
>> 
>> > Am 27.05.2020 um 18:18 schrieb Dark Crusader <re...@gmail.com>:
>> > 
>> > 
>> > Hi all,
>> > 
>> > I am reading data from hdfs in the form of parquet files (around 3 GB) and running an algorithm from the spark ml library.
>> > 
>> > If I create the same spark dataframe by reading data from S3, the same algorithm takes considerably more time.
>> > 
>> > I don't understand why this is happening. Is this a chance occurence or are the spark dataframes created different? 
>> > 
>> > I don't understand how the data store would effect the algorithm performance.
>> > 
>> > Any help would be appreciated. Thanks a lot.

Re: Spark dataframe hdfs vs s3

Posted by randy clinton <ra...@gmail.com>.
HDFS is simply a better place to make performant reads and on top of that
the data is closer to your spark job. The databricks link from above will
show you that where they find a 6x read throughput difference between the
two.

If your HDFS is part of the same Spark cluster than it should be an
incredibly fast read vs reaching out to S3 for the data.

They are different types of storage solving different things.

Something I have seen in workflows is something other people have suggested
above, is a stage where you load data from S3 into HDFS, then move on to
you other work with it and maybe finally persist outside of HDFS.

On Fri, May 29, 2020 at 2:09 PM Bin Fan <fa...@gmail.com> wrote:

> Try to deploy Alluxio as a caching layer on top of S3, providing Spark a
> similar HDFS interface?
> Like in this article:
>
> https://www.alluxio.io/blog/accelerate-spark-and-hive-jobs-on-aws-s3-by-10x-with-alluxio-tiered-storage/
>
>
> On Wed, May 27, 2020 at 6:52 PM Dark Crusader <
> relinquisheddragon@gmail.com> wrote:
>
>> Hi Randy,
>>
>> Yes, I'm using parquet on both S3 and hdfs.
>>
>> On Thu, 28 May, 2020, 2:38 am randy clinton, <ra...@gmail.com>
>> wrote:
>>
>>> Is the file Parquet on S3 or is it some other file format?
>>>
>>> In general I would assume that HDFS read/writes are more performant for
>>> spark jobs.
>>>
>>> For instance, consider how well partitioned your HDFS file is vs the S3
>>> file.
>>>
>>> On Wed, May 27, 2020 at 1:51 PM Dark Crusader <
>>> relinquisheddragon@gmail.com> wrote:
>>>
>>>> Hi Jörn,
>>>>
>>>> Thanks for the reply. I will try to create a easier example to
>>>> reproduce the issue.
>>>>
>>>> I will also try your suggestion to look into the UI. Can you guide on
>>>> what I should be looking for?
>>>>
>>>> I was already using the s3a protocol to compare the times.
>>>>
>>>> My hunch is that multiple reads from S3 are required because of
>>>> improper caching of intermediate data. And maybe hdfs is doing a better job
>>>> at this. Does this make sense?
>>>>
>>>> I would also like to add that we built an extra layer on S3 which might
>>>> be adding to even slower times.
>>>>
>>>> Thanks for your help.
>>>>
>>>> On Wed, 27 May, 2020, 11:03 pm Jörn Franke, <jo...@gmail.com>
>>>> wrote:
>>>>
>>>>> Have you looked in Spark UI why this is the case ?
>>>>> S3 Reading can take more time - it depends also what s3 url you are
>>>>> using : s3a vs s3n vs S3.
>>>>>
>>>>> It could help after some calculation to persist in-memory or on HDFS.
>>>>> You can also initially load from S3 and store on HDFS and work from there .
>>>>>
>>>>> HDFS offers Data locality for the tasks, ie the tasks start on the
>>>>> nodes where the data is. Depending on what s3 „protocol“ you are using you
>>>>> might be also more punished with performance.
>>>>>
>>>>> Try s3a as a protocol (replace all s3n with s3a).
>>>>>
>>>>> You can also use s3 url but this requires a special bucket
>>>>> configuration, a dedicated empty bucket and it lacks some ineroperability
>>>>> with other AWS services.
>>>>>
>>>>> Nevertheless, it could be also something else with the code. Can you
>>>>> post an example reproducing the issue?
>>>>>
>>>>> > Am 27.05.2020 um 18:18 schrieb Dark Crusader <
>>>>> relinquisheddragon@gmail.com>:
>>>>> >
>>>>> > 
>>>>> > Hi all,
>>>>> >
>>>>> > I am reading data from hdfs in the form of parquet files (around 3
>>>>> GB) and running an algorithm from the spark ml library.
>>>>> >
>>>>> > If I create the same spark dataframe by reading data from S3, the
>>>>> same algorithm takes considerably more time.
>>>>> >
>>>>> > I don't understand why this is happening. Is this a chance occurence
>>>>> or are the spark dataframes created different?
>>>>> >
>>>>> > I don't understand how the data store would effect the algorithm
>>>>> performance.
>>>>> >
>>>>> > Any help would be appreciated. Thanks a lot.
>>>>>
>>>>
>>>
>>> --
>>> I appreciate your time,
>>>
>>> ~Randy
>>>
>>

-- 
I appreciate your time,

~Randy

Re: Spark dataframe hdfs vs s3

Posted by Bin Fan <fa...@gmail.com>.
Try to deploy Alluxio as a caching layer on top of S3, providing Spark a
similar HDFS interface?
Like in this article:
https://www.alluxio.io/blog/accelerate-spark-and-hive-jobs-on-aws-s3-by-10x-with-alluxio-tiered-storage/


On Wed, May 27, 2020 at 6:52 PM Dark Crusader <re...@gmail.com>
wrote:

> Hi Randy,
>
> Yes, I'm using parquet on both S3 and hdfs.
>
> On Thu, 28 May, 2020, 2:38 am randy clinton, <ra...@gmail.com>
> wrote:
>
>> Is the file Parquet on S3 or is it some other file format?
>>
>> In general I would assume that HDFS read/writes are more performant for
>> spark jobs.
>>
>> For instance, consider how well partitioned your HDFS file is vs the S3
>> file.
>>
>> On Wed, May 27, 2020 at 1:51 PM Dark Crusader <
>> relinquisheddragon@gmail.com> wrote:
>>
>>> Hi Jörn,
>>>
>>> Thanks for the reply. I will try to create a easier example to reproduce
>>> the issue.
>>>
>>> I will also try your suggestion to look into the UI. Can you guide on
>>> what I should be looking for?
>>>
>>> I was already using the s3a protocol to compare the times.
>>>
>>> My hunch is that multiple reads from S3 are required because of improper
>>> caching of intermediate data. And maybe hdfs is doing a better job at this.
>>> Does this make sense?
>>>
>>> I would also like to add that we built an extra layer on S3 which might
>>> be adding to even slower times.
>>>
>>> Thanks for your help.
>>>
>>> On Wed, 27 May, 2020, 11:03 pm Jörn Franke, <jo...@gmail.com>
>>> wrote:
>>>
>>>> Have you looked in Spark UI why this is the case ?
>>>> S3 Reading can take more time - it depends also what s3 url you are
>>>> using : s3a vs s3n vs S3.
>>>>
>>>> It could help after some calculation to persist in-memory or on HDFS.
>>>> You can also initially load from S3 and store on HDFS and work from there .
>>>>
>>>> HDFS offers Data locality for the tasks, ie the tasks start on the
>>>> nodes where the data is. Depending on what s3 „protocol“ you are using you
>>>> might be also more punished with performance.
>>>>
>>>> Try s3a as a protocol (replace all s3n with s3a).
>>>>
>>>> You can also use s3 url but this requires a special bucket
>>>> configuration, a dedicated empty bucket and it lacks some ineroperability
>>>> with other AWS services.
>>>>
>>>> Nevertheless, it could be also something else with the code. Can you
>>>> post an example reproducing the issue?
>>>>
>>>> > Am 27.05.2020 um 18:18 schrieb Dark Crusader <
>>>> relinquisheddragon@gmail.com>:
>>>> >
>>>> > 
>>>> > Hi all,
>>>> >
>>>> > I am reading data from hdfs in the form of parquet files (around 3
>>>> GB) and running an algorithm from the spark ml library.
>>>> >
>>>> > If I create the same spark dataframe by reading data from S3, the
>>>> same algorithm takes considerably more time.
>>>> >
>>>> > I don't understand why this is happening. Is this a chance occurence
>>>> or are the spark dataframes created different?
>>>> >
>>>> > I don't understand how the data store would effect the algorithm
>>>> performance.
>>>> >
>>>> > Any help would be appreciated. Thanks a lot.
>>>>
>>>
>>
>> --
>> I appreciate your time,
>>
>> ~Randy
>>
>

Re: Spark dataframe hdfs vs s3

Posted by Kanwaljit Singh <ka...@uwaterloo.ca>.
You can’t play much if it is a streaming job. But in case of batch jobs, sometimes teams will copy their S3 data to HDFS in prep for the next run :D

From: randy clinton <ra...@gmail.com>
Date: Thursday, May 28, 2020 at 5:50 AM
To: Dark Crusader <re...@gmail.com>
Cc: Jörn Franke <jo...@gmail.com>, user <us...@spark.apache.org>
Subject: Re: Spark dataframe hdfs vs s3

See if this helps

"That is to say, on a per node basis, HDFS can yield 6X higher read throughput than S3. Thus, given that the S3 is 10x cheaper than HDFS, we find that S3 is almost 2x better compared to HDFS on performance per dollar."

https://databricks.com/blog/2017/05/31/top-5-reasons-for-choosing-s3-over-hdfs.html


On Wed, May 27, 2020, 9:51 PM Dark Crusader <re...@gmail.com>> wrote:
Hi Randy,

Yes, I'm using parquet on both S3 and hdfs.

On Thu, 28 May, 2020, 2:38 am randy clinton, <ra...@gmail.com>> wrote:
Is the file Parquet on S3 or is it some other file format?

In general I would assume that HDFS read/writes are more performant for spark jobs.

For instance, consider how well partitioned your HDFS file is vs the S3 file.

On Wed, May 27, 2020 at 1:51 PM Dark Crusader <re...@gmail.com>> wrote:
Hi Jörn,

Thanks for the reply. I will try to create a easier example to reproduce the issue.

I will also try your suggestion to look into the UI. Can you guide on what I should be looking for?

I was already using the s3a protocol to compare the times.

My hunch is that multiple reads from S3 are required because of improper caching of intermediate data. And maybe hdfs is doing a better job at this. Does this make sense?

I would also like to add that we built an extra layer on S3 which might be adding to even slower times.

Thanks for your help.

On Wed, 27 May, 2020, 11:03 pm Jörn Franke, <jo...@gmail.com>> wrote:
Have you looked in Spark UI why this is the case ?
S3 Reading can take more time - it depends also what s3 url you are using : s3a vs s3n vs S3.

It could help after some calculation to persist in-memory or on HDFS. You can also initially load from S3 and store on HDFS and work from there .

HDFS offers Data locality for the tasks, ie the tasks start on the nodes where the data is. Depending on what s3 „protocol“ you are using you might be also more punished with performance.

Try s3a as a protocol (replace all s3n with s3a).

You can also use s3 url but this requires a special bucket configuration, a dedicated empty bucket and it lacks some ineroperability with other AWS services.

Nevertheless, it could be also something else with the code. Can you post an example reproducing the issue?

> Am 27.05.2020 um 18:18 schrieb Dark Crusader <re...@gmail.com>>:
>
>
> Hi all,
>
> I am reading data from hdfs in the form of parquet files (around 3 GB) and running an algorithm from the spark ml library.
>
> If I create the same spark dataframe by reading data from S3, the same algorithm takes considerably more time.
>
> I don't understand why this is happening. Is this a chance occurence or are the spark dataframes created different?
>
> I don't understand how the data store would effect the algorithm performance.
>
> Any help would be appreciated. Thanks a lot.


--
I appreciate your time,

~Randy

Re: Spark dataframe hdfs vs s3

Posted by randy clinton <ra...@gmail.com>.
See if this helps

"That is to say, on a per node basis, HDFS can yield 6X higher read
throughput than S3. Thus, *given that the S3 is 10x cheaper than HDFS, we
find that S3 is almost 2x better compared to HDFS on performance per
dollar."*

*https://databricks.com/blog/2017/05/31/top-5-reasons-for-choosing-s3-over-hdfs.html
<https://databricks.com/blog/2017/05/31/top-5-reasons-for-choosing-s3-over-hdfs.html>*


On Wed, May 27, 2020, 9:51 PM Dark Crusader <re...@gmail.com>
wrote:

> Hi Randy,
>
> Yes, I'm using parquet on both S3 and hdfs.
>
> On Thu, 28 May, 2020, 2:38 am randy clinton, <ra...@gmail.com>
> wrote:
>
>> Is the file Parquet on S3 or is it some other file format?
>>
>> In general I would assume that HDFS read/writes are more performant for
>> spark jobs.
>>
>> For instance, consider how well partitioned your HDFS file is vs the S3
>> file.
>>
>> On Wed, May 27, 2020 at 1:51 PM Dark Crusader <
>> relinquisheddragon@gmail.com> wrote:
>>
>>> Hi Jörn,
>>>
>>> Thanks for the reply. I will try to create a easier example to reproduce
>>> the issue.
>>>
>>> I will also try your suggestion to look into the UI. Can you guide on
>>> what I should be looking for?
>>>
>>> I was already using the s3a protocol to compare the times.
>>>
>>> My hunch is that multiple reads from S3 are required because of improper
>>> caching of intermediate data. And maybe hdfs is doing a better job at this.
>>> Does this make sense?
>>>
>>> I would also like to add that we built an extra layer on S3 which might
>>> be adding to even slower times.
>>>
>>> Thanks for your help.
>>>
>>> On Wed, 27 May, 2020, 11:03 pm Jörn Franke, <jo...@gmail.com>
>>> wrote:
>>>
>>>> Have you looked in Spark UI why this is the case ?
>>>> S3 Reading can take more time - it depends also what s3 url you are
>>>> using : s3a vs s3n vs S3.
>>>>
>>>> It could help after some calculation to persist in-memory or on HDFS.
>>>> You can also initially load from S3 and store on HDFS and work from there .
>>>>
>>>> HDFS offers Data locality for the tasks, ie the tasks start on the
>>>> nodes where the data is. Depending on what s3 „protocol“ you are using you
>>>> might be also more punished with performance.
>>>>
>>>> Try s3a as a protocol (replace all s3n with s3a).
>>>>
>>>> You can also use s3 url but this requires a special bucket
>>>> configuration, a dedicated empty bucket and it lacks some ineroperability
>>>> with other AWS services.
>>>>
>>>> Nevertheless, it could be also something else with the code. Can you
>>>> post an example reproducing the issue?
>>>>
>>>> > Am 27.05.2020 um 18:18 schrieb Dark Crusader <
>>>> relinquisheddragon@gmail.com>:
>>>> >
>>>> > 
>>>> > Hi all,
>>>> >
>>>> > I am reading data from hdfs in the form of parquet files (around 3
>>>> GB) and running an algorithm from the spark ml library.
>>>> >
>>>> > If I create the same spark dataframe by reading data from S3, the
>>>> same algorithm takes considerably more time.
>>>> >
>>>> > I don't understand why this is happening. Is this a chance occurence
>>>> or are the spark dataframes created different?
>>>> >
>>>> > I don't understand how the data store would effect the algorithm
>>>> performance.
>>>> >
>>>> > Any help would be appreciated. Thanks a lot.
>>>>
>>>
>>
>> --
>> I appreciate your time,
>>
>> ~Randy
>>
>

Re: Spark dataframe hdfs vs s3

Posted by Dark Crusader <re...@gmail.com>.
Hi Randy,

Yes, I'm using parquet on both S3 and hdfs.

On Thu, 28 May, 2020, 2:38 am randy clinton, <ra...@gmail.com> wrote:

> Is the file Parquet on S3 or is it some other file format?
>
> In general I would assume that HDFS read/writes are more performant for
> spark jobs.
>
> For instance, consider how well partitioned your HDFS file is vs the S3
> file.
>
> On Wed, May 27, 2020 at 1:51 PM Dark Crusader <
> relinquisheddragon@gmail.com> wrote:
>
>> Hi Jörn,
>>
>> Thanks for the reply. I will try to create a easier example to reproduce
>> the issue.
>>
>> I will also try your suggestion to look into the UI. Can you guide on
>> what I should be looking for?
>>
>> I was already using the s3a protocol to compare the times.
>>
>> My hunch is that multiple reads from S3 are required because of improper
>> caching of intermediate data. And maybe hdfs is doing a better job at this.
>> Does this make sense?
>>
>> I would also like to add that we built an extra layer on S3 which might
>> be adding to even slower times.
>>
>> Thanks for your help.
>>
>> On Wed, 27 May, 2020, 11:03 pm Jörn Franke, <jo...@gmail.com> wrote:
>>
>>> Have you looked in Spark UI why this is the case ?
>>> S3 Reading can take more time - it depends also what s3 url you are
>>> using : s3a vs s3n vs S3.
>>>
>>> It could help after some calculation to persist in-memory or on HDFS.
>>> You can also initially load from S3 and store on HDFS and work from there .
>>>
>>> HDFS offers Data locality for the tasks, ie the tasks start on the nodes
>>> where the data is. Depending on what s3 „protocol“ you are using you might
>>> be also more punished with performance.
>>>
>>> Try s3a as a protocol (replace all s3n with s3a).
>>>
>>> You can also use s3 url but this requires a special bucket
>>> configuration, a dedicated empty bucket and it lacks some ineroperability
>>> with other AWS services.
>>>
>>> Nevertheless, it could be also something else with the code. Can you
>>> post an example reproducing the issue?
>>>
>>> > Am 27.05.2020 um 18:18 schrieb Dark Crusader <
>>> relinquisheddragon@gmail.com>:
>>> >
>>> > 
>>> > Hi all,
>>> >
>>> > I am reading data from hdfs in the form of parquet files (around 3 GB)
>>> and running an algorithm from the spark ml library.
>>> >
>>> > If I create the same spark dataframe by reading data from S3, the same
>>> algorithm takes considerably more time.
>>> >
>>> > I don't understand why this is happening. Is this a chance occurence
>>> or are the spark dataframes created different?
>>> >
>>> > I don't understand how the data store would effect the algorithm
>>> performance.
>>> >
>>> > Any help would be appreciated. Thanks a lot.
>>>
>>
>
> --
> I appreciate your time,
>
> ~Randy
>

Re: Spark dataframe hdfs vs s3

Posted by randy clinton <ra...@gmail.com>.
Is the file Parquet on S3 or is it some other file format?

In general I would assume that HDFS read/writes are more performant for
spark jobs.

For instance, consider how well partitioned your HDFS file is vs the S3
file.

On Wed, May 27, 2020 at 1:51 PM Dark Crusader <re...@gmail.com>
wrote:

> Hi Jörn,
>
> Thanks for the reply. I will try to create a easier example to reproduce
> the issue.
>
> I will also try your suggestion to look into the UI. Can you guide on what
> I should be looking for?
>
> I was already using the s3a protocol to compare the times.
>
> My hunch is that multiple reads from S3 are required because of improper
> caching of intermediate data. And maybe hdfs is doing a better job at this.
> Does this make sense?
>
> I would also like to add that we built an extra layer on S3 which might be
> adding to even slower times.
>
> Thanks for your help.
>
> On Wed, 27 May, 2020, 11:03 pm Jörn Franke, <jo...@gmail.com> wrote:
>
>> Have you looked in Spark UI why this is the case ?
>> S3 Reading can take more time - it depends also what s3 url you are using
>> : s3a vs s3n vs S3.
>>
>> It could help after some calculation to persist in-memory or on HDFS. You
>> can also initially load from S3 and store on HDFS and work from there .
>>
>> HDFS offers Data locality for the tasks, ie the tasks start on the nodes
>> where the data is. Depending on what s3 „protocol“ you are using you might
>> be also more punished with performance.
>>
>> Try s3a as a protocol (replace all s3n with s3a).
>>
>> You can also use s3 url but this requires a special bucket configuration,
>> a dedicated empty bucket and it lacks some ineroperability with other AWS
>> services.
>>
>> Nevertheless, it could be also something else with the code. Can you post
>> an example reproducing the issue?
>>
>> > Am 27.05.2020 um 18:18 schrieb Dark Crusader <
>> relinquisheddragon@gmail.com>:
>> >
>> > 
>> > Hi all,
>> >
>> > I am reading data from hdfs in the form of parquet files (around 3 GB)
>> and running an algorithm from the spark ml library.
>> >
>> > If I create the same spark dataframe by reading data from S3, the same
>> algorithm takes considerably more time.
>> >
>> > I don't understand why this is happening. Is this a chance occurence or
>> are the spark dataframes created different?
>> >
>> > I don't understand how the data store would effect the algorithm
>> performance.
>> >
>> > Any help would be appreciated. Thanks a lot.
>>
>

-- 
I appreciate your time,

~Randy

Re: Spark dataframe hdfs vs s3

Posted by Dark Crusader <re...@gmail.com>.
Hi Jörn,

Thanks for the reply. I will try to create a easier example to reproduce
the issue.

I will also try your suggestion to look into the UI. Can you guide on what
I should be looking for?

I was already using the s3a protocol to compare the times.

My hunch is that multiple reads from S3 are required because of improper
caching of intermediate data. And maybe hdfs is doing a better job at this.
Does this make sense?

I would also like to add that we built an extra layer on S3 which might be
adding to even slower times.

Thanks for your help.

On Wed, 27 May, 2020, 11:03 pm Jörn Franke, <jo...@gmail.com> wrote:

> Have you looked in Spark UI why this is the case ?
> S3 Reading can take more time - it depends also what s3 url you are using
> : s3a vs s3n vs S3.
>
> It could help after some calculation to persist in-memory or on HDFS. You
> can also initially load from S3 and store on HDFS and work from there .
>
> HDFS offers Data locality for the tasks, ie the tasks start on the nodes
> where the data is. Depending on what s3 „protocol“ you are using you might
> be also more punished with performance.
>
> Try s3a as a protocol (replace all s3n with s3a).
>
> You can also use s3 url but this requires a special bucket configuration,
> a dedicated empty bucket and it lacks some ineroperability with other AWS
> services.
>
> Nevertheless, it could be also something else with the code. Can you post
> an example reproducing the issue?
>
> > Am 27.05.2020 um 18:18 schrieb Dark Crusader <
> relinquisheddragon@gmail.com>:
> >
> > 
> > Hi all,
> >
> > I am reading data from hdfs in the form of parquet files (around 3 GB)
> and running an algorithm from the spark ml library.
> >
> > If I create the same spark dataframe by reading data from S3, the same
> algorithm takes considerably more time.
> >
> > I don't understand why this is happening. Is this a chance occurence or
> are the spark dataframes created different?
> >
> > I don't understand how the data store would effect the algorithm
> performance.
> >
> > Any help would be appreciated. Thanks a lot.
>

Re: Spark dataframe hdfs vs s3

Posted by Jörn Franke <jo...@gmail.com>.
Have you looked in Spark UI why this is the case ? 
S3 Reading can take more time - it depends also what s3 url you are using : s3a vs s3n vs S3.

It could help after some calculation to persist in-memory or on HDFS. You can also initially load from S3 and store on HDFS and work from there . 

HDFS offers Data locality for the tasks, ie the tasks start on the nodes where the data is. Depending on what s3 „protocol“ you are using you might be also more punished with performance.

Try s3a as a protocol (replace all s3n with s3a).

You can also use s3 url but this requires a special bucket configuration, a dedicated empty bucket and it lacks some ineroperability with other AWS services.

Nevertheless, it could be also something else with the code. Can you post an example reproducing the issue?

> Am 27.05.2020 um 18:18 schrieb Dark Crusader <re...@gmail.com>:
> 
> 
> Hi all,
> 
> I am reading data from hdfs in the form of parquet files (around 3 GB) and running an algorithm from the spark ml library.
> 
> If I create the same spark dataframe by reading data from S3, the same algorithm takes considerably more time.
> 
> I don't understand why this is happening. Is this a chance occurence or are the spark dataframes created different? 
> 
> I don't understand how the data store would effect the algorithm performance.
> 
> Any help would be appreciated. Thanks a lot.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org