You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/07/31 09:25:42 UTC

[GitHub] [iceberg] GrigorievNick opened a new issue #2899: Spark3 Invalid partition transformation when try create Iceberg bucket partition with DataSourceV2 api.

GrigorievNick opened a new issue #2899:
URL: https://github.com/apache/iceberg/issues/2899


   When I try to create  iceberg table with iceberg bucketing through DatFrame v2 API, I got 
   
   ```
   Invalid partition transformation: iceberg_bucket2(`modification_time`)
   org.apache.spark.sql.AnalysisException: Invalid partition transformation: iceberg_bucket2(`modification_time`)
   	at org.apache.spark.sql.DataFrameWriterV2.$anonfun$partitionedBy$2(DataFrameWriterV2.scala:102)
   	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285)
   	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
   	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
   	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
   	at scala.collection.TraversableLike.map(TraversableLike.scala:285)
   	at scala.collection.TraversableLike.map$(TraversableLike.scala:278)
   	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
   	at org.apache.spark.sql.DataFrameWriterV2.partitionedBy(DataFrameWriterV2.scala:88)
   
   ```
   I check code and spark SQL do not expect any transformation there, except predefined in spark.
   ```
   @scala.annotation.varargs
     override def partitionedBy(column: Column, columns: Column*): CreateTableWriter[T] = {
       def ref(name: String): NamedReference = LogicalExpressions.parseReference(name)
   
       val asTransforms = (column +: columns).map(_.expr).map {
         case Years(attr: Attribute) =>
           LogicalExpressions.years(ref(attr.name))
         case Months(attr: Attribute) =>
           LogicalExpressions.months(ref(attr.name))
         case Days(attr: Attribute) =>
           LogicalExpressions.days(ref(attr.name))
         case Hours(attr: Attribute) =>
           LogicalExpressions.hours(ref(attr.name))
         case Bucket(Literal(numBuckets: Int, IntegerType), attr: Attribute) =>
           LogicalExpressions.bucket(numBuckets, Array(ref(attr.name)))
         case attr: Attribute =>
           LogicalExpressions.identity(ref(attr.name))
         case expr =>
           throw new AnalysisException(s"Invalid partition transformation: ${expr.sql}")
       }
   
   ```
   Code to reproduce.
   ```
   import org.apache.iceberg.spark.IcebergSpark
   import org.apache.spark.sql.SparkSession
   import org.apache.spark.sql.functions._
   import org.apache.spark.sql.types.DataTypes
   import org.scalatest.FunSuite
   
   class IcebergTest extends FunSuite {
   
     private val testPath = "/tmp/iceberg_cdc_test"
     private val testTable = "hdl.test_table"
   
     val sparkSession: SparkSession = SparkSession.builder()
       .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions ")
       .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
       .config("spark.sql.catalog.spark_catalog.type", "hadoop")
       .config("spark.sql.catalog.spark_catalog.warehouse", s"$testPath/iceberg_catalog/")
       .master("local")
       .getOrCreate()
   
     IcebergSpark.registerBucketUDF(sparkSession, "iceberg_bucket2", DataTypes.LongType, 2)
   
     import sparkSession.implicits._
     test("Create table") {
   
       val expectedDate = (0 until 10).map(id => (id, "data1", System.currentTimeMillis()))
       expectedDate
         .toDF("id", "data", "modification_time")
         .sortWithinPartitions(expr("iceberg_bucket2(modification_time)"))
         .writeTo(testTable)
         .using("iceberg")
         .tableProperty("write.format.default", "orc")
         .partitionedBy(expr("iceberg_bucket2(modification_time)"))
         .createOrReplace()
   
       sparkSession
         .read
         .format("iceberg")
         .table(testTable)
         .show()
   
   //
     }
   
   }
   ```
   
   Look like a bug, But I am not sure how to fix it.
   Any idea?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] candily commented on issue #2899: Spark3 Invalid partition transformation when try create Iceberg bucket partition with DataSourceV2 api.

Posted by GitBox <gi...@apache.org>.
candily commented on issue #2899:
URL: https://github.com/apache/iceberg/issues/2899#issuecomment-894646781


   Re-reading this, I think the intent is that you should just use the [built-in Spark bucket transform](https://spark.apache.org/docs/3.1.1/api/scala/org/apache/spark/sql/functions$.html#bucket(numBuckets:Int,e:org.apache.spark.sql.Column):org.apache.spark.sql.Column) for initially specifying the output table partitioning, and use the Iceberg UDF for sorting within a partition.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] candily commented on issue #2899: Spark3 Invalid partition transformation when try create Iceberg bucket partition with DataSourceV2 api.

Posted by GitBox <gi...@apache.org>.
candily commented on issue #2899:
URL: https://github.com/apache/iceberg/issues/2899#issuecomment-891160498


   I see the same when trying https://iceberg.apache.org/spark-writes/#writing-to-partitioned-tables. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org