You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "lichenglin (JIRA)" <ji...@apache.org> on 2016/11/11 16:04:58 UTC

[jira] [Updated] (SPARK-18413) Add a property to control the number of partitions when save a jdbc rdd

     [ https://issues.apache.org/jira/browse/SPARK-18413?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

lichenglin updated SPARK-18413:
-------------------------------
    Description: 
{code}
CREATE or replace TEMPORARY VIEW resultview
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:oracle:thin:@10.129.10.111:1521:BKDB",
  dbtable "result",
  user "HIVE",
  password "HIVE"
);
--set spark.sql.shuffle.partitions=200
insert overwrite table resultview select g,count(1) as count from tnet.DT_LIVE_INFO group by g
{code}

I'm tring to save a spark sql result to oracle.
And I found spark will create a jdbc connection for each partition.
if the sql create to many partitions , the database can't hold so many connections and return exception.

In above situation is 200 because of the "group by" and "spark.sql.shuffle.partitions"

the spark source code JdbcUtil is
{code}
def saveTable(
      df: DataFrame,
      url: String,
      table: String,
      properties: Properties) {
    val dialect = JdbcDialects.get(url)
    val nullTypes: Array[Int] = df.schema.fields.map { field =>
      getJdbcType(field.dataType, dialect).jdbcNullType
    }

    val rddSchema = df.schema
    val getConnection: () => Connection = createConnectionFactory(url, properties)
    val batchSize = properties.getProperty(JDBC_BATCH_INSERT_SIZE, "1000").toInt
    df.foreachPartition { iterator =>
      savePartition(getConnection, table, iterator, rddSchema, nullTypes, batchSize, dialect)
    }
  }
{code}

May be we can add a property for df.repartition(num).foreachPartition ?

In fact I got an exception "ORA-12519, TNS:no appropriate service handler found"

  was:
{code}
CREATE or replace TEMPORARY VIEW resultview
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:oracle:thin:@10.129.10.111:1521:BKDB",
  dbtable "result",
  user "HIVE",
  password "HIVE"
);
--set spark.sql.shuffle.partitions=200
insert overwrite table resultview select g,count(1) as count from tnet.DT_LIVE_INFO group by g
{code}

I'm tring to save a spark sql result to oracle.
And I found spark will create a jdbc connection for each partition.
if the sql create to many partitions , the database can't hold so many connections and return exception.

In above situation is 200 because of the "group by" and "spark.sql.shuffle.partitions"

the spark source code JdbcUtil is
{code}
def saveTable(
      df: DataFrame,
      url: String,
      table: String,
      properties: Properties) {
    val dialect = JdbcDialects.get(url)
    val nullTypes: Array[Int] = df.schema.fields.map { field =>
      getJdbcType(field.dataType, dialect).jdbcNullType
    }

    val rddSchema = df.schema
    val getConnection: () => Connection = createConnectionFactory(url, properties)
    val batchSize = properties.getProperty(JDBC_BATCH_INSERT_SIZE, "1000").toInt
    df.foreachPartition { iterator =>
      savePartition(getConnection, table, iterator, rddSchema, nullTypes, batchSize, dialect)
    }
  }
{code}

May be we can add a property for df.repartition(num).foreachPartition ?



> Add a property to control the number of partitions when save a jdbc rdd
> -----------------------------------------------------------------------
>
>                 Key: SPARK-18413
>                 URL: https://issues.apache.org/jira/browse/SPARK-18413
>             Project: Spark
>          Issue Type: Wish
>          Components: SQL
>    Affects Versions: 2.0.1
>            Reporter: lichenglin
>
> {code}
> CREATE or replace TEMPORARY VIEW resultview
> USING org.apache.spark.sql.jdbc
> OPTIONS (
>   url "jdbc:oracle:thin:@10.129.10.111:1521:BKDB",
>   dbtable "result",
>   user "HIVE",
>   password "HIVE"
> );
> --set spark.sql.shuffle.partitions=200
> insert overwrite table resultview select g,count(1) as count from tnet.DT_LIVE_INFO group by g
> {code}
> I'm tring to save a spark sql result to oracle.
> And I found spark will create a jdbc connection for each partition.
> if the sql create to many partitions , the database can't hold so many connections and return exception.
> In above situation is 200 because of the "group by" and "spark.sql.shuffle.partitions"
> the spark source code JdbcUtil is
> {code}
> def saveTable(
>       df: DataFrame,
>       url: String,
>       table: String,
>       properties: Properties) {
>     val dialect = JdbcDialects.get(url)
>     val nullTypes: Array[Int] = df.schema.fields.map { field =>
>       getJdbcType(field.dataType, dialect).jdbcNullType
>     }
>     val rddSchema = df.schema
>     val getConnection: () => Connection = createConnectionFactory(url, properties)
>     val batchSize = properties.getProperty(JDBC_BATCH_INSERT_SIZE, "1000").toInt
>     df.foreachPartition { iterator =>
>       savePartition(getConnection, table, iterator, rddSchema, nullTypes, batchSize, dialect)
>     }
>   }
> {code}
> May be we can add a property for df.repartition(num).foreachPartition ?
> In fact I got an exception "ORA-12519, TNS:no appropriate service handler found"



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org