You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/07/23 07:37:24 UTC

[GitHub] [iceberg] wesselr opened a new issue #2853: Is it possible to write a partitioned and bucketed table with spark?

wesselr opened a new issue #2853:
URL: https://github.com/apache/iceberg/issues/2853


   Setup:
   - Spark: 3.0.2
   - Iceberg: 0.11.1
   
   Is it possible to write a table that is partitioned by day and bucketed by an id? I have success writing either a partitioned or bucket table but not combined. From what I understand I need to define a UDF for the days transform so that I can sort by it to have it in the same partitions otherwise I get `java.lang.IllegalStateException: Already closed files for partition`. 
   
   `import org.apache.iceberg.spark.IcebergSpark`
   `import org.apache.spark.sql.types.DataTypes`
   `IcebergSpark.registerBucketUDF(spark, "id_bucket10", DataTypes.LongType, 10)`
   
   or
   
   `val bucketTransform = Transforms.bucket[java.lang.Long](Types.LongType.get(), 10)`
   `def bucketFunc(id: Long): Int = bucketTransform.apply(id)`
   `val id_bucket10 = spark.udf.register("id_bucket10", bucketFunc _)`
   
   Gives me the desired bucket udf for spark but I am struggling to create a similar udf for days.
   
   `val daysTransform = Transforms.day[java.sql.Timestamp](Types.TimestampType.withZone())`
   `def daysFunc(ts: java.sql.Timestamp): Int = daysTransform.apply(ts)`
   `val iceberg_days = spark.udf.register("iceberg_days", daysFunc _)`
   
   Gives me: `org.apache.spark.SparkException: Failed to execute user defined function (timestamp) => int)` which makes sense but I am obviously missing something to get `Date` returned rather than an `Int`.
   
   In a nutshell I am trying to achieve:
   `df.sort(expr("iceberg_days(ts)"), expr("id_bucket10(id)")).writeTo("hive.test.partition_and_bucket").append()` 
   
   Thanks,
   Wessel 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wesselr commented on issue #2853: Is it possible to write a partitioned and bucketed table with spark?

Posted by GitBox <gi...@apache.org>.
wesselr commented on issue #2853:
URL: https://github.com/apache/iceberg/issues/2853#issuecomment-887389462


   @openinx 
   
   Compiled and tested, then baked into my docker images.
   
   Was able to write the partitioned and bucketed table with success. Checked counts and did some extra appends to see if dups or data was lost. Writing worked flawlessly, will continue investigating to confirm if there are issues and report back if I find any.  
   
   Thanks for the patch, let me know if I can help with documentation for the `IcebergSpark` function register when they are added. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on issue #2853: Is it possible to write a partitioned and bucketed table with spark?

Posted by GitBox <gi...@apache.org>.
openinx commented on issue #2853:
URL: https://github.com/apache/iceberg/issues/2853#issuecomment-887403512


   @wesselr  thanks for the verification ! If you have the bandwidth, you can try to prepare the PR to add  all the build-in transforms UDF (if you want to contribute) in the `IcebergSpark` and provide a document for it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wesselr commented on issue #2853: Is it possible to write a partitioned and bucketed table with spark?

Posted by GitBox <gi...@apache.org>.
wesselr commented on issue #2853:
URL: https://github.com/apache/iceberg/issues/2853#issuecomment-887413297


   @openinx 
   
   Sure, will try and make time this week or early next week. 
   
   Are there any guidelines I need to follow for PRs?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wesselr commented on issue #2853: Is it possible to write a partitioned and bucketed table with spark?

Posted by GitBox <gi...@apache.org>.
wesselr commented on issue #2853:
URL: https://github.com/apache/iceberg/issues/2853#issuecomment-885560362


   @openinx 
   
   Thanks, I'll give it a go when I get a chance. Thanks for the prompt response, much appreciated.
   
   Adding all the build-in transforms into `IcebergSpark` will be great. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on issue #2853: Is it possible to write a partitioned and bucketed table with spark?

Posted by GitBox <gi...@apache.org>.
openinx commented on issue #2853:
URL: https://github.com/apache/iceberg/issues/2853#issuecomment-885523086


   @wesselr  I think this patch can address your issue: 
   
   ```patch
   From 9a7eb8abb7d909cef5d1feece9a47eefd9a91452 Mon Sep 17 00:00:00 2001
   From: huzheng <op...@gmail.com>
   Date: Fri, 23 Jul 2021 17:38:55 +0800
   Subject: [PATCH] Add day UDF
   
   ---
    .../apache/iceberg/spark/IcebergSpark.java    | 22 +++++++++++++++++++
    .../spark/source/TestIcebergSpark.java        |  6 +++++
    2 files changed, 28 insertions(+)
   
   diff --git a/spark/src/main/java/org/apache/iceberg/spark/IcebergSpark.java b/spark/src/main/java/org/apache/iceberg/spark/IcebergSpark.java
   index ac659f6c7..674df2119 100644
   --- a/spark/src/main/java/org/apache/iceberg/spark/IcebergSpark.java
   +++ b/spark/src/main/java/org/apache/iceberg/spark/IcebergSpark.java
   @@ -19,9 +19,11 @@
    
    package org.apache.iceberg.spark;
    
   +import java.sql.Timestamp;
    import org.apache.iceberg.transforms.Transform;
    import org.apache.iceberg.transforms.Transforms;
    import org.apache.iceberg.types.Type;
   +import org.apache.iceberg.util.DateTimeUtil;
    import org.apache.spark.sql.SparkSession;
    import org.apache.spark.sql.types.DataType;
    import org.apache.spark.sql.types.DataTypes;
   @@ -36,4 +38,24 @@ public class IcebergSpark {
        Transform<Object, Integer> bucket = Transforms.bucket(sourceIcebergType, numBuckets);
        session.udf().register(funcName, bucket::apply, DataTypes.IntegerType);
      }
   +
   +  public static void registerDayUDF(SparkSession session, String funcName, DataType sourceType) {
   +    SparkTypeToType typeConverter = new SparkTypeToType();
   +    Type sourceIcebergType = typeConverter.atomic(sourceType);
   +    Transform<Object, Integer> day = Transforms.day(sourceIcebergType);
   +    session.udf().register(funcName, o -> dayUDF(day, o), DataTypes.IntegerType);
   +  }
   +
   +  private static Integer dayUDF(Transform<Object, Integer> day, Object obj) {
   +    if (obj == null) {
   +      return day.apply(null);
   +    }
   +
   +    if (obj instanceof Timestamp) {
   +      Timestamp ts = (Timestamp) obj;
   +      return day.apply(DateTimeUtil.microsFromInstant(ts.toInstant()));
   +    } else {
   +      throw new IllegalArgumentException("Invalid type: " + obj.getClass());
   +    }
   +  }
    }
   diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSpark.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSpark.java
   index 14785d7f2..725ff955f 100644
   --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSpark.java
   +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSpark.java
   @@ -67,4 +67,10 @@ public abstract class TestIcebergSpark {
        Assert.assertEquals((int) Transforms.bucket(Types.StringType.get(), 16).apply("hello"),
            results3.get(0).getInt(0));
      }
   +
   +  @Test
   +  public void testRegisterDayUDF() {
   +    IcebergSpark.registerDayUDF(spark, "iceberg_day", DataTypes.TimestampType);
   +    spark.sql("SELECT iceberg_day(to_timestamp('2016-12-31 00:12:00'))").collectAsList();
   +  }
    }
   -- 
   2.20.1 (Apple Git-117)
   ```
   
   You can run the added test case `testRegisterDayUDF` to verify it.  Maybe we could add all the iceberg build-in transforms into the `IcebergSpark`'s function registers.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org