You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/06/23 21:21:39 UTC

[GitHub] [iceberg] kbendick commented on a diff in pull request #5120: Flink: add sql partition functions

kbendick commented on code in PR #5120:
URL: https://github.com/apache/iceberg/pull/5120#discussion_r905464803


##########
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java:
##########
@@ -619,12 +631,17 @@ public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec partitionS
 
   @Override
   public List<String> listFunctions(String dbName) throws CatalogException {
-    return Collections.emptyList();
+    return Lists.newArrayList(partitionFunctions.keySet());
   }
 
   @Override
   public CatalogFunction getFunction(ObjectPath functionPath) throws FunctionNotExistException, CatalogException {
-    throw new FunctionNotExistException(getName(), functionPath);
+    CatalogFunction catalogFunction = partitionFunctions.get(functionPath.getObjectName());

Review Comment:
   Are there any situations where this might get used for other reasons than just partition UDFs? Like accessing some other UDF that the user might have registered?
   
   From the existing code, it looks fine. But for my own understanding, I’m still wondering if it’s possible for a user to register their own UDF that would be associated with this Catalog, and if so are we currently making it not possible to do that?



##########
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionTransformUdf.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the Licenet ideajoinet ideajoin for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.sink;
+
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.InputGroup;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.iceberg.expressions.Literals;
+import org.apache.iceberg.transforms.Transform;
+import org.apache.iceberg.transforms.Transforms;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.TypeUtil;
+
+public class PartitionTransformUdf {
+
+  public static class Truncate extends ScalarFunction {
+    public String eval(int num, @DataTypeHint(inputGroup = InputGroup.ANY) Object obj) {
+      Type type = TypeUtil.fromJavaType(obj);
+      Transform<Object, Object> truncate = Transforms.truncate(type, num);
+      Object value = truncate.apply(Literals.fromJavaType(obj).to(type).value());

Review Comment:
   Do we need to do any more further type evaluation? What will happen if the type passed in is not supported?



##########
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionTransformUdf.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the Licenet ideajoinet ideajoin for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.sink;
+
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.InputGroup;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.iceberg.expressions.Literals;
+import org.apache.iceberg.transforms.Transform;
+import org.apache.iceberg.transforms.Transforms;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.TypeUtil;
+
+public class PartitionTransformUdf {
+
+  public static class Truncate extends ScalarFunction {
+    public String eval(int num, @DataTypeHint(inputGroup = InputGroup.ANY) Object obj) {

Review Comment:
   Is there any way for us to be more specific about what types this function can take? Does Flink use these annotations to check the users SQL before running the query?



##########
flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestPartitionUdf.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.sink;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.List;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.iceberg.expressions.Literal;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.transforms.Transform;
+import org.apache.iceberg.transforms.Transforms;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestPartitionUdf {
+
+  public static TableEnvironment tEnv;
+
+  @BeforeClass
+  public static void before() {
+    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+    tEnv = StreamTableEnvironment.create(env);
+    tEnv.createTemporarySystemFunction("buckets", PartitionTransformUdf.Bucket.class);
+    tEnv.createTemporarySystemFunction("truncates", PartitionTransformUdf.Truncate.class);

Review Comment:
   Question: Can we provide a single registration option so things are uniform? The `bucket` and `truncate` functions are universally needed when working with Iceberg in my opinion (though not in all situations).
   
   If we automatically register the functions for users when the catalog is instantiated, does that have any performance impact or downsides?
   
   Also, for naming, is it possible that we might clash with some other name (if we registered the functions for people)?
   
   I’m working on function registration in Spark so we should ideally get our names somewhat stabilized.
   
   I would propose `iceberg_bucket` but need to better understand how it is used.



##########
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionTransformUdf.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the Licenet ideajoinet ideajoin for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.sink;
+
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.InputGroup;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.iceberg.expressions.Literals;
+import org.apache.iceberg.transforms.Transform;
+import org.apache.iceberg.transforms.Transforms;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.TypeUtil;
+
+public class PartitionTransformUdf {
+
+  public static class Truncate extends ScalarFunction {
+    public String eval(int num, @DataTypeHint(inputGroup = InputGroup.ANY) Object obj) {
+      Type type = TypeUtil.fromJavaType(obj);
+      Transform<Object, Object> truncate = Transforms.truncate(type, num);
+      Object value = truncate.apply(Literals.fromJavaType(obj).to(type).value());
+      return truncate.toHumanString(value);
+    }
+  }
+
+  public static class Bucket extends ScalarFunction {
+    public String eval(int num, @DataTypeHint(inputGroup = InputGroup.ANY) Object obj) {
+      Type type = TypeUtil.fromJavaType(obj);
+      Transform<Object, Integer> bucket = Transforms.bucket(type, num);

Review Comment:
   Question for my own understanding:
   
   Is it possible to make a typed version of the function? Is that common / desired generally speaking for Flink UDFs?



##########
flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestPartitionUdf.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.sink;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.List;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.iceberg.expressions.Literal;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.transforms.Transform;
+import org.apache.iceberg.transforms.Transforms;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestPartitionUdf {
+
+  public static TableEnvironment tEnv;
+
+  @BeforeClass
+  public static void before() {
+    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+    tEnv = StreamTableEnvironment.create(env);
+    tEnv.createTemporarySystemFunction("buckets", PartitionTransformUdf.Bucket.class);
+    tEnv.createTemporarySystemFunction("truncates", PartitionTransformUdf.Truncate.class);
+  }
+
+  protected List<Row> sql(String query, Object... args) {
+    TableResult tableResult = tEnv.executeSql(String.format(query, args));
+    try (CloseableIterator<Row> it = tableResult.collect()) {
+      return Lists.newArrayList(it);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to collect table result", e);
+    }
+  }
+
+
+  @Test
+  public void testBucketDate() {
+    Transform<Object, Integer> bucket = Transforms.bucket(Types.DateType.get(), 4);
+
+    String date = "2022-05-20";
+    Integer value = bucket.apply(Literal.of(date).to(Types.DateType.get()).value());
+    String expectHumanString = bucket.toHumanString(value);
+
+    List<Row> sql = sql("SELECT buckets(4, DATE '%s')", date);
+    Assert.assertEquals(expectHumanString, sql.get(0).getField(0));
+  }
+
+  @Test
+  public void testBucketInt() {
+    Transform<Object, Integer> bucket = Transforms.bucket(Types.IntegerType.get(), 4);
+
+    int num = 10;
+    Integer obj = bucket.apply(num);
+    String expectHumanString = bucket.toHumanString(obj);
+
+    List<Row> sql = sql("SELECT buckets(4, %d)", num);
+    Assert.assertEquals(expectHumanString, sql.get(0).getField(0));
+  }
+
+  @Test
+  public void testBucketLong() {
+    Transform<Object, Integer> bucket = Transforms.bucket(Types.LongType.get(), 4);
+
+    long num = 10;
+    Integer obj = bucket.apply(num);
+    String expectHumanString = bucket.toHumanString(obj);
+
+    List<Row> sql = sql("SELECT buckets(4, %d)", num);
+    Assert.assertEquals(expectHumanString, sql.get(0).getField(0));
+  }
+
+  @Test
+  public void testBucketTimestamp() {
+    Transform<Object, Integer> bucket = Transforms.bucket(Types.TimestampType.withoutZone(), 4);
+    String ts = "2022-05-20T10:12:55.038194";
+    Literal<Long> lts = Literal.of(ts).to((Types.TimestampType.withoutZone()));
+    Integer apply = bucket.apply(lts.value());
+    String expectHumanString = bucket.toHumanString(apply);
+
+    List<Row> sql = sql("SELECT buckets(4, TIMESTAMP '2022-05-20 10:12:55.038194')");
+    Assert.assertEquals(expectHumanString, sql.get(0).getField(0));
+  }
+
+  @Test
+  public void testBucketString() {
+    Transform<Object, Integer> bucket = Transforms.bucket(Types.StringType.get(), 4);
+
+    String str = "abcdef";
+    Integer obj = bucket.apply(str);
+    String expectHumanString = bucket.toHumanString(obj);
+
+    List<Row> sql = sql("SELECT buckets(4, '%s')", str);
+    Assert.assertEquals(expectHumanString, sql.get(0).getField(0));
+  }
+
+  @Test
+  public void testBucketDecimalType() {
+    Transform<Object, Integer> bucket = Transforms.bucket(Types.DecimalType.of(6, 5), 4);
+
+    Integer obj = bucket.apply(BigDecimal.valueOf(6.12345));
+    String expectHumanString = bucket.toHumanString(obj);
+
+    List<Row> sql = sql("select buckets(4, cast(6.12345 as decimal(6,5)))");
+    Assert.assertEquals(expectHumanString, sql.get(0).getField(0));
+  }
+
+  @Test
+  public void testBucketFixed() {
+    Transform<Object, Integer> bucket = Transforms.bucket(Types.FixedType.ofLength(6), 4);
+
+    Integer obj = bucket.apply(ByteBuffer.wrap(new byte[] {1, 2, 3, 4, 5, 6}));
+    String expectHumanString = bucket.toHumanString(obj);
+
+    List<Row> sql = sql("SELECT buckets(4, x'010203040506')");
+    Assert.assertEquals(expectHumanString, sql.get(0).getField(0));
+  }
+
+  @Test
+  public void testBucketTime() {
+    Transform<Object, Integer> bucket = Transforms.bucket(Types.TimeType.get(), 4);
+
+    String str = "14:08:59";
+    Integer obj = bucket.apply(Literal.of(str).to(Types.TimeType.get()).value());
+    String expectHumanString = bucket.toHumanString(obj);
+
+    List<Row> sql = sql("SELECT buckets(4, TIME '%s')", str);
+    Assert.assertEquals(expectHumanString, sql.get(0).getField(0));
+  }
+
+  @Test
+  public void testBucketByteBuffer() {
+    Transform<Object, Integer> bucket = Transforms.bucket(Types.BinaryType.get(), 4);
+
+    Integer obj = bucket.apply(ByteBuffer.wrap(new byte[] {1, 2, 3, 4, 5, 6}));
+    String expectHumanString = bucket.toHumanString(obj);
+
+    List<Row> sql = sql("SELECT buckets(4, x'010203040506')");
+    Assert.assertEquals(expectHumanString, sql.get(0).getField(0));
+  }
+
+  @Test
+  public void testTruncateInt() {
+    Transform<Object, Object> truncate = Transforms.truncate(Types.IntegerType.get(), 4);
+
+    int num = 10;
+    Object obj = truncate.apply(num);
+    String expectHumanString = truncate.toHumanString(obj);
+
+    List<Row> sql = sql("SELECT truncates(4, %d)", num);
+    Assert.assertEquals(expectHumanString, sql.get(0).getField(0));
+  }
+
+  @Test
+  public void testTruncateLong() {
+    Transform<Object, Object> truncate = Transforms.truncate(Types.LongType.get(), 4);
+
+    long num = 10;
+    Object obj = truncate.apply(num);
+    String expectHumanString = truncate.toHumanString(obj);
+
+    List<Row> sql = sql("SELECT truncates(4, %d)", num);
+    Assert.assertEquals(expectHumanString, sql.get(0).getField(0));
+  }
+
+  @Test
+  public void testTruncateString() {
+    Transform<Object, Object> truncate = Transforms.truncate(Types.StringType.get(), 4);
+
+    String str = "abcdef";
+    Object obj = truncate.apply(str);
+    String expectHumanString = truncate.toHumanString(obj);
+
+    List<Row> sql = sql("SELECT truncates(4, '%s')", str);
+    Assert.assertEquals(expectHumanString, sql.get(0).getField(0));
+  }
+
+  @Test
+  public void testTruncateDecimalType() {
+    Transform<Object, Object> truncate = Transforms.truncate(Types.DecimalType.of(6, 5), 4);
+
+    Object obj = truncate.apply(BigDecimal.valueOf(6.12345));
+    String expectHumanString = truncate.toHumanString(obj);
+
+    List<Row> sql = sql("select truncates(4, cast(6.12345 as decimal(6,5)))");
+    Assert.assertEquals(expectHumanString, sql.get(0).getField(0));
+  }
+
+  @Test
+  public void testTruncateByteBuffer() {
+    Transform<Object, Object> truncate = Transforms.truncate(Types.BinaryType.get(), 4);
+
+    Object obj = truncate.apply(ByteBuffer.wrap(new byte[] {1, 2, 3, 4, 5, 6}));
+    String expectHumanString = truncate.toHumanString(obj);
+
+    List<Row> sql = sql("SELECT truncates(4, x'010203040506')");
+    Assert.assertEquals(expectHumanString, sql.get(0).getField(0));
+  }

Review Comment:
   I’m still reviewing these tests, but for any types that are _not_ supported, can we add a test checking what happens if those types are passed in?
   
   Assuming an exception results, then `AssertHelpers` has some methods that should be useful.



##########
flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestPartitionUdf.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.sink;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.List;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.iceberg.expressions.Literal;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.transforms.Transform;
+import org.apache.iceberg.transforms.Transforms;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestPartitionUdf {
+
+  public static TableEnvironment tEnv;
+
+  @BeforeClass
+  public static void before() {
+    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+    tEnv = StreamTableEnvironment.create(env);
+    tEnv.createTemporarySystemFunction("buckets", PartitionTransformUdf.Bucket.class);
+    tEnv.createTemporarySystemFunction("truncates", PartitionTransformUdf.Truncate.class);
+  }
+
+  protected List<Row> sql(String query, Object... args) {
+    TableResult tableResult = tEnv.executeSql(String.format(query, args));
+    try (CloseableIterator<Row> it = tableResult.collect()) {
+      return Lists.newArrayList(it);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to collect table result", e);
+    }
+  }
+
+
+  @Test
+  public void testBucketDate() {
+    Transform<Object, Integer> bucket = Transforms.bucket(Types.DateType.get(), 4);
+
+    String date = "2022-05-20";
+    Integer value = bucket.apply(Literal.of(date).to(Types.DateType.get()).value());
+    String expectHumanString = bucket.toHumanString(value);
+
+    List<Row> sql = sql("SELECT buckets(4, DATE '%s')", date);
+    Assert.assertEquals(expectHumanString, sql.get(0).getField(0));
+  }
+
+  @Test
+  public void testBucketInt() {
+    Transform<Object, Integer> bucket = Transforms.bucket(Types.IntegerType.get(), 4);
+
+    int num = 10;
+    Integer obj = bucket.apply(num);
+    String expectHumanString = bucket.toHumanString(obj);
+
+    List<Row> sql = sql("SELECT buckets(4, %d)", num);
+    Assert.assertEquals(expectHumanString, sql.get(0).getField(0));
+  }
+
+  @Test
+  public void testBucketLong() {
+    Transform<Object, Integer> bucket = Transforms.bucket(Types.LongType.get(), 4);
+
+    long num = 10;
+    Integer obj = bucket.apply(num);
+    String expectHumanString = bucket.toHumanString(obj);

Review Comment:
   Nit: Can we have a test where the expected human string is tested? Eg with a hard coded string value as the expected human string and then the actual value is the result of this call?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org