You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/07/31 09:25:42 UTC
[GitHub] [iceberg] GrigorievNick opened a new issue #2899: Spark3 Invalid partition transformation when try create Iceberg bucket partition with DataSourceV2 api.
GrigorievNick opened a new issue #2899:
URL: https://github.com/apache/iceberg/issues/2899
When I try to create iceberg table with iceberg bucketing through DatFrame v2 API, I got
```
Invalid partition transformation: iceberg_bucket2(`modification_time`)
org.apache.spark.sql.AnalysisException: Invalid partition transformation: iceberg_bucket2(`modification_time`)
at org.apache.spark.sql.DataFrameWriterV2.$anonfun$partitionedBy$2(DataFrameWriterV2.scala:102)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at scala.collection.TraversableLike.map(TraversableLike.scala:285)
at scala.collection.TraversableLike.map$(TraversableLike.scala:278)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at org.apache.spark.sql.DataFrameWriterV2.partitionedBy(DataFrameWriterV2.scala:88)
```
I check code and spark SQL do not expect any transformation there, except predefined in spark.
```
@scala.annotation.varargs
override def partitionedBy(column: Column, columns: Column*): CreateTableWriter[T] = {
def ref(name: String): NamedReference = LogicalExpressions.parseReference(name)
val asTransforms = (column +: columns).map(_.expr).map {
case Years(attr: Attribute) =>
LogicalExpressions.years(ref(attr.name))
case Months(attr: Attribute) =>
LogicalExpressions.months(ref(attr.name))
case Days(attr: Attribute) =>
LogicalExpressions.days(ref(attr.name))
case Hours(attr: Attribute) =>
LogicalExpressions.hours(ref(attr.name))
case Bucket(Literal(numBuckets: Int, IntegerType), attr: Attribute) =>
LogicalExpressions.bucket(numBuckets, Array(ref(attr.name)))
case attr: Attribute =>
LogicalExpressions.identity(ref(attr.name))
case expr =>
throw new AnalysisException(s"Invalid partition transformation: ${expr.sql}")
}
```
Code to reproduce.
```
import org.apache.iceberg.spark.IcebergSpark
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.DataTypes
import org.scalatest.FunSuite
class IcebergTest extends FunSuite {
private val testPath = "/tmp/iceberg_cdc_test"
private val testTable = "hdl.test_table"
val sparkSession: SparkSession = SparkSession.builder()
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions ")
.config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
.config("spark.sql.catalog.spark_catalog.type", "hadoop")
.config("spark.sql.catalog.spark_catalog.warehouse", s"$testPath/iceberg_catalog/")
.master("local")
.getOrCreate()
IcebergSpark.registerBucketUDF(sparkSession, "iceberg_bucket2", DataTypes.LongType, 2)
import sparkSession.implicits._
test("Create table") {
val expectedDate = (0 until 10).map(id => (id, "data1", System.currentTimeMillis()))
expectedDate
.toDF("id", "data", "modification_time")
.sortWithinPartitions(expr("iceberg_bucket2(modification_time)"))
.writeTo(testTable)
.using("iceberg")
.tableProperty("write.format.default", "orc")
.partitionedBy(expr("iceberg_bucket2(modification_time)"))
.createOrReplace()
sparkSession
.read
.format("iceberg")
.table(testTable)
.show()
//
}
}
```
Look like a bug, But I am not sure how to fix it.
Any idea?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
[GitHub] [iceberg] candily commented on issue #2899: Spark3 Invalid partition transformation when try create Iceberg bucket partition with DataSourceV2 api.
Posted by GitBox <gi...@apache.org>.
candily commented on issue #2899:
URL: https://github.com/apache/iceberg/issues/2899#issuecomment-894646781
Re-reading this, I think the intent is that you should just use the [built-in Spark bucket transform](https://spark.apache.org/docs/3.1.1/api/scala/org/apache/spark/sql/functions$.html#bucket(numBuckets:Int,e:org.apache.spark.sql.Column):org.apache.spark.sql.Column) for initially specifying the output table partitioning, and use the Iceberg UDF for sorting within a partition.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
[GitHub] [iceberg] candily commented on issue #2899: Spark3 Invalid partition transformation when try create Iceberg bucket partition with DataSourceV2 api.
Posted by GitBox <gi...@apache.org>.
candily commented on issue #2899:
URL: https://github.com/apache/iceberg/issues/2899#issuecomment-891160498
I see the same when trying https://iceberg.apache.org/spark-writes/#writing-to-partitioned-tables.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org