You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean Owen (JIRA)" <ji...@apache.org> on 2016/11/21 13:57:59 UTC

[jira] [Resolved] (SPARK-18413) Add a property to control the number of partitions when save a jdbc rdd

     [ https://issues.apache.org/jira/browse/SPARK-18413?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sean Owen resolved SPARK-18413.
-------------------------------
       Resolution: Fixed
    Fix Version/s: 2.2.0

Issue resolved by pull request 15868
[https://github.com/apache/spark/pull/15868]

> Add a property to control the number of partitions when save a jdbc rdd
> -----------------------------------------------------------------------
>
>                 Key: SPARK-18413
>                 URL: https://issues.apache.org/jira/browse/SPARK-18413
>             Project: Spark
>          Issue Type: Wish
>          Components: SQL
>    Affects Versions: 2.0.1
>            Reporter: lichenglin
>             Fix For: 2.2.0
>
>
> {code}
> CREATE or replace TEMPORARY VIEW resultview
> USING org.apache.spark.sql.jdbc
> OPTIONS (
>   url "jdbc:oracle:thin:@10.129.10.111:1521:BKDB",
>   dbtable "result",
>   user "HIVE",
>   password "HIVE"
> );
> --set spark.sql.shuffle.partitions=200
> insert overwrite table resultview select g,count(1) as count from tnet.DT_LIVE_INFO group by g
> {code}
> I'm tring to save a spark sql result to oracle.
> And I found spark will create a jdbc connection for each partition.
> if the sql create to many partitions , the database can't hold so many connections and return exception.
> In above situation is 200 because of the "group by" and "spark.sql.shuffle.partitions"
> the spark source code JdbcUtil is
> {code}
> def saveTable(
>       df: DataFrame,
>       url: String,
>       table: String,
>       properties: Properties) {
>     val dialect = JdbcDialects.get(url)
>     val nullTypes: Array[Int] = df.schema.fields.map { field =>
>       getJdbcType(field.dataType, dialect).jdbcNullType
>     }
>     val rddSchema = df.schema
>     val getConnection: () => Connection = createConnectionFactory(url, properties)
>     val batchSize = properties.getProperty(JDBC_BATCH_INSERT_SIZE, "1000").toInt
>     df.foreachPartition { iterator =>
>       savePartition(getConnection, table, iterator, rddSchema, nullTypes, batchSize, dialect)
>     }
>   }
> {code}
> May be we can add a property for df.repartition(num).foreachPartition ?
> In fact I got an exception "ORA-12519, TNS:no appropriate service handler found"



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org