You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Moises Baly <mo...@urban4m.com> on 2015/07/10 16:04:37 UTC

Fwd: SparkSQL Postgres balanced partition of DataFrames

Hi,

I have a very simple setup of SparkSQL connecting to a Postgres DB and I'm
trying to get a DataFrame from a table, the Dataframe with a number X of
partitions (lets say 2). The code would be the following:

Map<String, String> options = new HashMap<String, String>();
options.put("url", DB_URL);
options.put("driver", POSTGRES_DRIVER);
options.put("dbtable", "select ID, OTHER from TABLE limit 1000");
options.put("partitionColumn", "ID");
options.put("lowerBound", "100");
options.put("upperBound", "500");
options.put("numPartitions","2");
DataFrame housingDataFrame =
sqlContext.read().format("jdbc").options(options).load();

For some reason, one partition of the DataFrame contains almost all rows.

For what I can understand lowerBound/upperBound are the parameters used to
finetune this. In SparkSQL's documentation (Spark 1.4.0 - spark-sql_2.11)
it says they are used to define the stride, not to filter/range the
partition column. But that raises several questions:

   1. The stride is the frequency (number of elements returned each query)
   with which Spark will query the DB for each executor (partition)?
   2. If not, what is the purpose of this parameters, what do they depend
   on and how can I balance my DataFrame partitions in a stable way (not
   asking all partitions contain the same number of elements, just that there
   is an equilibrium - for example 2 partitions 100 elements 55/45 , 60/40 or
   even 65/35 would do)

Can't seem to find a clear answer to these questions around and was
wondering if maybe some of you could clear this points for me, because
right now is affecting my cluster performance when processing X million
rows and all the heavy lifting goes to one single executor.

Thank you for your time,

Moises Baly