You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/07/23 09:44:28 UTC

[GitHub] [iceberg] openinx commented on issue #2853: Is it possible to write a partitioned and bucketed table with spark?

openinx commented on issue #2853:
URL: https://github.com/apache/iceberg/issues/2853#issuecomment-885523086


   @wesselr  I think this patch can address your issue: 
   
   ```patch
   From 9a7eb8abb7d909cef5d1feece9a47eefd9a91452 Mon Sep 17 00:00:00 2001
   From: huzheng <op...@gmail.com>
   Date: Fri, 23 Jul 2021 17:38:55 +0800
   Subject: [PATCH] Add day UDF
   
   ---
    .../apache/iceberg/spark/IcebergSpark.java    | 22 +++++++++++++++++++
    .../spark/source/TestIcebergSpark.java        |  6 +++++
    2 files changed, 28 insertions(+)
   
   diff --git a/spark/src/main/java/org/apache/iceberg/spark/IcebergSpark.java b/spark/src/main/java/org/apache/iceberg/spark/IcebergSpark.java
   index ac659f6c7..674df2119 100644
   --- a/spark/src/main/java/org/apache/iceberg/spark/IcebergSpark.java
   +++ b/spark/src/main/java/org/apache/iceberg/spark/IcebergSpark.java
   @@ -19,9 +19,11 @@
    
    package org.apache.iceberg.spark;
    
   +import java.sql.Timestamp;
    import org.apache.iceberg.transforms.Transform;
    import org.apache.iceberg.transforms.Transforms;
    import org.apache.iceberg.types.Type;
   +import org.apache.iceberg.util.DateTimeUtil;
    import org.apache.spark.sql.SparkSession;
    import org.apache.spark.sql.types.DataType;
    import org.apache.spark.sql.types.DataTypes;
   @@ -36,4 +38,24 @@ public class IcebergSpark {
        Transform<Object, Integer> bucket = Transforms.bucket(sourceIcebergType, numBuckets);
        session.udf().register(funcName, bucket::apply, DataTypes.IntegerType);
      }
   +
   +  public static void registerDayUDF(SparkSession session, String funcName, DataType sourceType) {
   +    SparkTypeToType typeConverter = new SparkTypeToType();
   +    Type sourceIcebergType = typeConverter.atomic(sourceType);
   +    Transform<Object, Integer> day = Transforms.day(sourceIcebergType);
   +    session.udf().register(funcName, o -> dayUDF(day, o), DataTypes.IntegerType);
   +  }
   +
   +  private static Integer dayUDF(Transform<Object, Integer> day, Object obj) {
   +    if (obj == null) {
   +      return day.apply(null);
   +    }
   +
   +    if (obj instanceof Timestamp) {
   +      Timestamp ts = (Timestamp) obj;
   +      return day.apply(DateTimeUtil.microsFromInstant(ts.toInstant()));
   +    } else {
   +      throw new IllegalArgumentException("Invalid type: " + obj.getClass());
   +    }
   +  }
    }
   diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSpark.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSpark.java
   index 14785d7f2..725ff955f 100644
   --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSpark.java
   +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSpark.java
   @@ -67,4 +67,10 @@ public abstract class TestIcebergSpark {
        Assert.assertEquals((int) Transforms.bucket(Types.StringType.get(), 16).apply("hello"),
            results3.get(0).getInt(0));
      }
   +
   +  @Test
   +  public void testRegisterDayUDF() {
   +    IcebergSpark.registerDayUDF(spark, "iceberg_day", DataTypes.TimestampType);
   +    spark.sql("SELECT iceberg_day(to_timestamp('2016-12-31 00:12:00'))").collectAsList();
   +  }
    }
   -- 
   2.20.1 (Apple Git-117)
   ```
   
   You can run the added test case `testRegisterDayUDF` to verify it.  Maybe we could add all the iceberg build-in transforms into the `IcebergSpark`'s function registers.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org