You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Ajay Chander <it...@gmail.com> on 2016/10/08 17:17:26 UTC
Code review / sqlContext Scope
Hi Everyone,
Can anyone tell me if there is anything wrong with my code flow below ?
Based on each element from the text file I would like to run a query
against Hive table and persist results in another Hive table. I want to do
this in parallel for each element in the file. I appreciate any of your
inputs on this.
$ cat /home/ajay/flds.txt
PHARMY_NPI_ID
ALT_SUPPLIER_STORE_NBR
MAIL_SERV_NBR
spark-shell --name hivePersistTest --master yarn --deploy-mode client
val dataElementsFile = "/home/ajay/flds.txt"
val dataElements = Source.fromFile(dataElementsFile).getLines.toArray
def calculateQuery (de: String) : DataFrame = {
val calculatedQuery = sqlContext.sql("select 'UDA' as ds_nm, cyc_dt,
supplier_proc_i as supplier_proc_id, '" + de + "' as data_elm, " + de
+ " as data_elm_val," +
" count(1) as derx_val_cnt, current_timestamp as load_dt " +
"from SPRINT2_TEST2 " +
"group by 'UDA', cyc_dt, supplier_proc_i, '" + de + "' , " + de + " ")
return calculatedQuery
}
def persistResults (calculatedQuery: DataFrame) = {
calculatedQuery.write.insertInto("sprint2_stp1_test2")
}
dataElements.map(calculateQuery).foreach(persistResults)
Thanks.
Re: Code review / sqlContext Scope
Posted by Ajay Chander <it...@gmail.com>.
Can someone please shed some lights on this. I wrote the below code in
Scala 2.10.5, can someone please tell me if this is the right way of doing
it?
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.hive.HiveContext
class Test {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
val sc = new SparkContext(conf)
val sqlContext = new HiveContext(sc)
sqlContext.sql("set spark.sql.shuffle.partitions=1000");
sqlContext.sql("set hive.exec.dynamic.partition.mode=nonstrict")
val dataElementsFile = "hdfs://nameservice/user/ajay/spark/flds.txt"
// deDF has only 61 rows
val deDF = sqlContext.read.text(dataElementsFile).toDF("DataElement").coalesce(1).distinct().cache()
deDF.withColumn("ds_nm", lit("UDA")).withColumn("tabl_nm",
lit("TEST_DB.TEST_TABLE")).collect().filter(filterByDataset).map(calculateMetricsAtDELevel).foreach(persistResults)
// if ds_nm starts with 'RAW_' I dont want to process it
def filterByDataset(de: Row): Boolean = {
val datasetName = de.getAs[String]("ds_nm").trim
if (datasetName.startsWith("RAW_")) {
return false
}
else {
return true
}
}
def calculateMetricsAtDELevel(de: Row): DataFrame = {
val dataElement = de.getAs[String]("DataElement").trim
val datasetName = de.getAs[String]("ds_nm").trim
val tableName = de.getAs[String]("tabl_nm").trim
// udaDF holds 107,762,849 Rows * 412 Columns / 105 files in HDFS and 176.5
GB * 3 Replication Factor
val udaDF = sqlContext.sql("SELECT '" + datasetName + "' as ds_nm, cyc_dt,
supplier_proc_i, " +
" '" + dataElement + "' as data_elm, " + dataElement + " as data_elm_val
FROM " + tableName + "")
println("udaDF num Partitions: "+udaDF.toJavaRDD.getNumPartitions)
// udaDF.toJavaRDD.getNumPartitions = 1490
val calculatedMetrics = udaDF.groupBy("ds_nm", "cyc_dt", "supplier_proc_i",
"data_elm", "data_elm_val").count()
println("calculatedMetrics num Partitions: "
+calculatedMetrics.toJavaRDD.getNumPartitions)
// calculatedMetrics.toJavaRDD.getNumPartitions = 1000 since I set it to
sqlContext.sql("set spark.sql.shuffle.partitions=1000");
val adjustedSchemaDF = calculatedMetrics.withColumnRenamed("count",
"derx_val_cnt").withColumn("load_dt", current_timestamp())
println("adjustedSchemaDF num Partitions: "
+adjustedSchemaDF.toJavaRDD.getNumPartitions)
// adjustedSchemaDF.toJavaRDD.getNumPartitions = 1000 as well
return adjustedSchemaDF
}
def persistResults(adjustedSchemaDF: DataFrame) = {
// persist the resukts into Hive table backed by PARQUET
adjustedSchemaDF.write.partitionBy("ds_nm", "cyc_dt").mode("Append"
).insertInto("devl_df2_spf_batch.spf_supplier_trans_metric_detl_base_1")
}
}
}
This is my cluster( Spark 1.6.0 on Yarn, Cloudera 5.7.1) configuration,
Memory -> 4.10 TB
VCores -> 544
I am deploying the application in yarn client mode and the cluster is
set to use Dynamic Memory Allocation.
Any pointers are appreciated.
Thank you
On Sat, Oct 8, 2016 at 1:17 PM, Ajay Chander <it...@gmail.com> wrote:
> Hi Everyone,
>
> Can anyone tell me if there is anything wrong with my code flow below ?
> Based on each element from the text file I would like to run a query
> against Hive table and persist results in another Hive table. I want to do
> this in parallel for each element in the file. I appreciate any of your
> inputs on this.
>
> $ cat /home/ajay/flds.txt
> PHARMY_NPI_ID
> ALT_SUPPLIER_STORE_NBR
> MAIL_SERV_NBR
>
> spark-shell --name hivePersistTest --master yarn --deploy-mode client
>
> val dataElementsFile = "/home/ajay/flds.txt"
> val dataElements = Source.fromFile(dataElementsFile).getLines.toArray
>
> def calculateQuery (de: String) : DataFrame = {
> val calculatedQuery = sqlContext.sql("select 'UDA' as ds_nm, cyc_dt, supplier_proc_i as supplier_proc_id, '" + de + "' as data_elm, " + de + " as data_elm_val," +
> " count(1) as derx_val_cnt, current_timestamp as load_dt " +
> "from SPRINT2_TEST2 " +
> "group by 'UDA', cyc_dt, supplier_proc_i, '" + de + "' , " + de + " ")
>
> return calculatedQuery
> }
>
> def persistResults (calculatedQuery: DataFrame) = {
> calculatedQuery.write.insertInto("sprint2_stp1_test2")
> }
>
> dataElements.map(calculateQuery).foreach(persistResults)
>
>
> Thanks.
>
>