You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hu Fuwang (Jira)" <ji...@apache.org> on 2019/10/24 07:17:00 UTC

[jira] [Commented] (SPARK-29586) spark jdbc method param lowerBound and upperBound DataType wrong

    [ https://issues.apache.org/jira/browse/SPARK-29586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16958610#comment-16958610 ] 

Hu Fuwang commented on SPARK-29586:
-----------------------------------

I am working on this.

> spark jdbc method param lowerBound and upperBound DataType wrong
> ----------------------------------------------------------------
>
>                 Key: SPARK-29586
>                 URL: https://issues.apache.org/jira/browse/SPARK-29586
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 2.4.4, 3.0.0
>            Reporter: daile
>            Priority: Major
>
>  
> {code:java}
> private def toBoundValueInWhereClause(
>     value: Long,
>     columnType: DataType,
>     timeZoneId: String): String = {
>   def dateTimeToString(): String = {
>     val dateTimeStr = columnType match {
>       case DateType => DateFormatter().format(value.toInt)
>       case TimestampType =>
>         val timestampFormatter = TimestampFormatter.getFractionFormatter(
>           DateTimeUtils.getZoneId(timeZoneId))
>         DateTimeUtils.timestampToString(timestampFormatter, value)
>     }
>     s"'$dateTimeStr'"
>   }
>   columnType match {
>     case _: NumericType => value.toString
>     case DateType | TimestampType => dateTimeToString()
>   }
> }{code}
> partitionColumn supoort NumericType, TimestampType, TimestampType but jdbc method only accept Long
> test("jdbc Suite2") {
>   val df = spark
>     .read
>     .option("partitionColumn", "B")
>     .option("lowerBound", "2017-01-01 10:00:00")
>     .option("upperBound", "2019-01-01 10:00:00")
>     .option("numPartitions", 5)
>     .jdbc(urlWithUserAndPass, "TEST.TIMETYPES",  new Properties())
>   df.printSchema()
>   df.show()
> }
> test("jdbc Suite2") {
>   val df = spark
>     .read
>     .option("partitionColumn", "B")
>     .option("lowerBound", "2017-01-01 10:00:00")
>     .option("upperBound", "2019-01-01 10:00:00")
>     .option("numPartitions", 5)
>     .jdbc(urlWithUserAndPass, "TEST.TIMETYPES",  new Properties())
>   df.printSchema()
>   df.show()
> }
> test("jdbc Suite") {
>   val df = spark.read.jdbc(urlWithUserAndPass, "TEST.TIMETYPES", "B", 1571899768024L, 1571899768024L, 5, new Properties())
>   df.printSchema()
>   df.show()
> }
> java.lang.IllegalArgumentException: Cannot parse the bound value 1571899768024 as date
>   at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.$anonfun$toInternalBoundValue$1(JDBCRelation.scala:184)
>   at scala.Option.getOrElse(Option.scala:189)
>   at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.parse$1(JDBCRelation.scala:183)
>   at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.toInternalBoundValue(JDBCRelation.scala:189)
>   at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.columnPartition(JDBCRelation.scala:88)
>   at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:36)
>   at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:339)
>   at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:240)
>   at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:229)
>   at scala.Option.getOrElse(Option.scala:189)
>   at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:229)
>   at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:179)
>   at org.apache.spark.sql.DataFrameReader.jdbc(DataFrameReader.scala:255)
>   at org.apache.spark.sql.DataFrameReader.jdbc(DataFrameReader.scala:297)
>   at org.apache.spark.sql.jdbc.JDBCSuite.$anonfun$new$186(JDBCSuite.scala:1664)
>   at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
>   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
>   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
>   at org.scalatest.Transformer.apply(Transformer.scala:22)
>   at org.scalatest.Transformer.apply(Transformer.scala:20)
>   at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
>   at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:149)
>   at org.scalatest.FunSuiteLike.invokeWithFixture$1(FunSuiteLike.scala:184)
>   at org.scalatest.FunSuiteLike.$anonfun$runTest$1(FunSuiteLike.scala:196)
>   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
>   at org.scalatest.FunSuiteLike.runTest(FunSuiteLike.scala:196)
>   at org.scalatest.FunSuiteLike.runTest$(FunSuiteLike.scala:178)
>   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:56)
>   at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:221)
>   at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:214)
>   at org.apache.spark.sql.jdbc.JDBCSuite.org$scalatest$BeforeAndAfter$$super$runTest(JDBCSuite.scala:43)
>   at org.scalatest.BeforeAndAfter.runTest(BeforeAndAfter.scala:203)
>   at org.scalatest.BeforeAndAfter.runTest$(BeforeAndAfter.scala:192)
>   at org.apache.spark.sql.jdbc.JDBCSuite.runTest(JDBCSuite.scala:43)
>   at org.scalatest.FunSuiteLike.$anonfun$runTests$1(FunSuiteLike.scala:229)
>   at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:396)
>   at scala.collection.immutable.List.foreach(List.scala:392)
>   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
>   at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:379)
>   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
>   at org.scalatest.FunSuiteLike.runTests(FunSuiteLike.scala:229)
>   at org.scalatest.FunSuiteLike.runTests$(FunSuiteLike.scala:228)
>   at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)
>   at org.scalatest.Suite.run(Suite.scala:1147)
>   at org.scalatest.Suite.run$(Suite.scala:1129)
>   at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
>   at org.scalatest.FunSuiteLike.$anonfun$run$1(FunSuiteLike.scala:233)
>   at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
>   at org.scalatest.FunSuiteLike.run(FunSuiteLike.scala:233)
>   at org.scalatest.FunSuiteLike.run$(FunSuiteLike.scala:232)
>   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:56)
>   at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
>   at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
>   at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
>   at org.apache.spark.sql.jdbc.JDBCSuite.org$scalatest$BeforeAndAfter$$super$run(JDBCSuite.scala:43)
>   at org.scalatest.BeforeAndAfter.run(BeforeAndAfter.scala:258)
>   at org.scalatest.BeforeAndAfter.run$(BeforeAndAfter.scala:256)
>   at org.apache.spark.sql.jdbc.JDBCSuite.run(JDBCSuite.scala:43)
>   at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
>   at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1346)
>   at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1340)
>   at scala.collection.immutable.List.foreach(List.scala:392)
>   at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1340)
>   at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:1031)
>   at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:1010)
>   at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1506)
>   at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1010)
>   at org.scalatest.tools.Runner$.run(Runner.scala:850)
>   at org.scalatest.tools.Runner.run(Runner.scala)
>   at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:131)
>   at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28)
> ​
> 00:03:34.139 WARN org.apache.spark.sql.jdbc.JDBCSuite: 
> ​
> ===== POSSIBLE THREAD LEAK IN SUITE o.a.s.sql.jdbc.JDBCSuite, thread names: Abandoned connection cleanup thread =====```
>  
> it's OK



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org