You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/08/11 16:41:03 UTC

[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5431: Spark: Support truncate in FunctionCatalog

aokolnychyi commented on code in PR #5431:
URL: https://github.com/apache/iceberg/pull/5431#discussion_r943667770


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/functions/TruncateFunction.java:
##########
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.functions;
+
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import org.apache.iceberg.util.BinaryUtil;
+import org.apache.iceberg.util.ByteBuffers;
+import org.apache.iceberg.util.TruncateUtil;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.functions.BoundFunction;
+import org.apache.spark.sql.connector.catalog.functions.ScalarFunction;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
+import org.apache.spark.sql.types.BinaryType;
+import org.apache.spark.sql.types.ByteType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.sql.types.LongType;
+import org.apache.spark.sql.types.ShortType;
+import org.apache.spark.sql.types.StringType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A Spark function implementation for the Iceberg truncate transform.
+ *
+ * <p>Example usage: {@code SELECT system.truncate(1, 'abc')}, which returns the String 'a'.
+ *
+ * <p>Note that for performance reasons, the given input width is not validated in the
+ * implementations used in code-gen. The width must remain non-negative to give meaningful results.
+ */
+public class TruncateFunction implements UnboundFunction {
+
+  private static void validateTruncationWidthType(DataType widthType) {
+    if (!DataTypes.IntegerType.sameType(widthType)
+        && !DataTypes.ShortType.sameType(widthType)
+        && !DataTypes.ByteType.sameType(widthType)) {
+      throw new UnsupportedOperationException(
+          "Expected truncation width to be tinyint, shortint or int");

Review Comment:
   nit: I'd probably either use `Cannot bind truncate:` prefix in all error messages related to binding or drop it everywhere for consistency.



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/functions/TruncateFunction.java:
##########
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.functions;
+
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import org.apache.iceberg.util.BinaryUtil;
+import org.apache.iceberg.util.ByteBuffers;
+import org.apache.iceberg.util.TruncateUtil;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.functions.BoundFunction;
+import org.apache.spark.sql.connector.catalog.functions.ScalarFunction;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
+import org.apache.spark.sql.types.BinaryType;
+import org.apache.spark.sql.types.ByteType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.sql.types.LongType;
+import org.apache.spark.sql.types.ShortType;
+import org.apache.spark.sql.types.StringType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A Spark function implementation for the Iceberg truncate transform.
+ *
+ * <p>Example usage: {@code SELECT system.truncate(1, 'abc')}, which returns the String 'a'.
+ *
+ * <p>Note that for performance reasons, the given input width is not validated in the
+ * implementations used in code-gen. The width must remain non-negative to give meaningful results.
+ */
+public class TruncateFunction implements UnboundFunction {
+
+  private static void validateTruncationWidthType(DataType widthType) {
+    if (!DataTypes.IntegerType.sameType(widthType)
+        && !DataTypes.ShortType.sameType(widthType)
+        && !DataTypes.ByteType.sameType(widthType)) {
+      throw new UnsupportedOperationException(
+          "Expected truncation width to be tinyint, shortint or int");
+    }
+  }
+
+  @Override
+  public BoundFunction bind(StructType inputType) {
+    if (inputType.fields().length != 2) {

Review Comment:
   nit: just `inputType.length()` or `inputType.size()`?



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/functions/TruncateFunction.java:
##########
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.functions;
+
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import org.apache.iceberg.util.BinaryUtil;
+import org.apache.iceberg.util.ByteBuffers;
+import org.apache.iceberg.util.TruncateUtil;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.functions.BoundFunction;
+import org.apache.spark.sql.connector.catalog.functions.ScalarFunction;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
+import org.apache.spark.sql.types.BinaryType;
+import org.apache.spark.sql.types.ByteType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.sql.types.LongType;
+import org.apache.spark.sql.types.ShortType;
+import org.apache.spark.sql.types.StringType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A Spark function implementation for the Iceberg truncate transform.
+ *
+ * <p>Example usage: {@code SELECT system.truncate(1, 'abc')}, which returns the String 'a'.
+ *
+ * <p>Note that for performance reasons, the given input width is not validated in the
+ * implementations used in code-gen. The width must remain non-negative to give meaningful results.
+ */
+public class TruncateFunction implements UnboundFunction {
+
+  private static void validateTruncationWidthType(DataType widthType) {
+    if (!DataTypes.IntegerType.sameType(widthType)
+        && !DataTypes.ShortType.sameType(widthType)
+        && !DataTypes.ByteType.sameType(widthType)) {
+      throw new UnsupportedOperationException(
+          "Expected truncation width to be tinyint, shortint or int");
+    }
+  }
+
+  @Override
+  public BoundFunction bind(StructType inputType) {
+    if (inputType.fields().length != 2) {
+      throw new UnsupportedOperationException(
+          "Cannot bind truncate: wrong number of inputs (expected width and value)");
+    }
+
+    StructField widthField = inputType.fields()[0];
+    StructField toTruncateField = inputType.fields()[1];
+    validateTruncationWidthType(widthField.dataType());
+
+    DataType toTruncateDataType = toTruncateField.dataType();
+    if (toTruncateDataType instanceof ByteType) {
+      return new TruncateTinyInt();
+    } else if (toTruncateDataType instanceof ShortType) {
+      return new TruncateSmallInt();
+    } else if (toTruncateDataType instanceof IntegerType) {
+      return new TruncateInt();
+    } else if (toTruncateDataType instanceof LongType) {
+      return new TruncateBigInt();
+    } else if (toTruncateDataType instanceof DecimalType) {
+      return new TruncateDecimal(
+          ((DecimalType) toTruncateDataType).precision(),
+          ((DecimalType) toTruncateDataType).scale());
+    } else if (toTruncateDataType instanceof StringType) {
+      return new TruncateString();
+    } else if (toTruncateDataType instanceof BinaryType) {
+      return new TruncateBinary();
+    } else {
+      throw new UnsupportedOperationException(
+          "Expected truncation col to be tinyint, shortint, int, bigint, decimal, string, or binary");
+    }
+  }
+
+  @Override
+  public String description() {
+    return name()
+        + "(width, col) - Call Iceberg's truncate transform\n"
+        + "  width :: width for truncation, e.g. truncate(10, 255) -> 250 (must be an integer)\n"
+        + "  col :: column to truncate (must be an integer, decimal, string, or binary)";
+  }
+
+  @Override
+  public String name() {
+    return "truncate";
+  }
+
+  public abstract static class TruncateBase<T> implements ScalarFunction<T> {
+    @Override
+    public String name() {
+      return "truncate";
+    }
+  }
+
+  public static class TruncateTinyInt extends TruncateBase<Byte> {
+    public static byte invoke(int width, byte value) {
+      return TruncateUtil.truncateByte(width, value);
+    }
+
+    @Override
+    public DataType[] inputTypes() {
+      return new DataType[] {DataTypes.IntegerType, DataTypes.ByteType};
+    }
+
+    @Override
+    public DataType resultType() {
+      return DataTypes.ByteType;
+    }
+
+    @Override
+    public String canonicalName() {
+      return "iceberg.truncate(tinyint)";
+    }
+
+    @Override
+    public Byte produceResult(InternalRow input) {
+      return input.isNullAt(0) || input.isNullAt(1)

Review Comment:
   What about defining constants for 0 and 1 with some reasonable names to indicate what those are?
   
   ```
   private static final int WIDTH_ORDINAL = 0;
   private static final int VALUE_ORDINAL = 1;
   ```



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/functions/TruncateFunction.java:
##########
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.functions;
+
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import org.apache.iceberg.util.BinaryUtil;
+import org.apache.iceberg.util.ByteBuffers;
+import org.apache.iceberg.util.TruncateUtil;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.functions.BoundFunction;
+import org.apache.spark.sql.connector.catalog.functions.ScalarFunction;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
+import org.apache.spark.sql.types.BinaryType;
+import org.apache.spark.sql.types.ByteType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.sql.types.LongType;
+import org.apache.spark.sql.types.ShortType;
+import org.apache.spark.sql.types.StringType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A Spark function implementation for the Iceberg truncate transform.
+ *
+ * <p>Example usage: {@code SELECT system.truncate(1, 'abc')}, which returns the String 'a'.
+ *
+ * <p>Note that for performance reasons, the given input width is not validated in the
+ * implementations used in code-gen. The width must remain non-negative to give meaningful results.
+ */
+public class TruncateFunction implements UnboundFunction {
+
+  private static void validateTruncationWidthType(DataType widthType) {
+    if (!DataTypes.IntegerType.sameType(widthType)

Review Comment:
   nit: Since we don't expect an array, struct or map, we don't really need `sameType`. You could define a set and call `contains` on it.
   
   ```
   private static final Set<DataType> SUPPORTED_WIDTH_TYPES =
       ImmutableSet.of(DataTypes.ByteType, DataTypes.ShortType, DataTypes.IntegerType);
   
   ...
   
   if (!SUPPORTED_WIDTH_TYPES.contains(widthField.dataType())) {
     throw new UnsupportedOperationException(...);
   }
   ```



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/functions/TruncateFunction.java:
##########
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.functions;
+
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import org.apache.iceberg.util.BinaryUtil;
+import org.apache.iceberg.util.ByteBuffers;
+import org.apache.iceberg.util.TruncateUtil;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.functions.BoundFunction;
+import org.apache.spark.sql.connector.catalog.functions.ScalarFunction;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
+import org.apache.spark.sql.types.BinaryType;
+import org.apache.spark.sql.types.ByteType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.sql.types.LongType;
+import org.apache.spark.sql.types.ShortType;
+import org.apache.spark.sql.types.StringType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A Spark function implementation for the Iceberg truncate transform.
+ *
+ * <p>Example usage: {@code SELECT system.truncate(1, 'abc')}, which returns the String 'a'.
+ *
+ * <p>Note that for performance reasons, the given input width is not validated in the
+ * implementations used in code-gen. The width must remain non-negative to give meaningful results.
+ */
+public class TruncateFunction implements UnboundFunction {
+
+  private static void validateTruncationWidthType(DataType widthType) {
+    if (!DataTypes.IntegerType.sameType(widthType)
+        && !DataTypes.ShortType.sameType(widthType)
+        && !DataTypes.ByteType.sameType(widthType)) {
+      throw new UnsupportedOperationException(
+          "Expected truncation width to be tinyint, shortint or int");
+    }
+  }
+
+  @Override
+  public BoundFunction bind(StructType inputType) {
+    if (inputType.fields().length != 2) {
+      throw new UnsupportedOperationException(
+          "Cannot bind truncate: wrong number of inputs (expected width and value)");
+    }
+
+    StructField widthField = inputType.fields()[0];
+    StructField toTruncateField = inputType.fields()[1];

Review Comment:
   nit: Do we call it `value` in most other places?



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/functions/TruncateFunction.java:
##########
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.functions;
+
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import org.apache.iceberg.util.BinaryUtil;
+import org.apache.iceberg.util.ByteBuffers;
+import org.apache.iceberg.util.TruncateUtil;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.functions.BoundFunction;
+import org.apache.spark.sql.connector.catalog.functions.ScalarFunction;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
+import org.apache.spark.sql.types.BinaryType;
+import org.apache.spark.sql.types.ByteType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.sql.types.LongType;
+import org.apache.spark.sql.types.ShortType;
+import org.apache.spark.sql.types.StringType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A Spark function implementation for the Iceberg truncate transform.
+ *
+ * <p>Example usage: {@code SELECT system.truncate(1, 'abc')}, which returns the String 'a'.
+ *
+ * <p>Note that for performance reasons, the given input width is not validated in the
+ * implementations used in code-gen. The width must remain non-negative to give meaningful results.
+ */
+public class TruncateFunction implements UnboundFunction {
+
+  private static void validateTruncationWidthType(DataType widthType) {
+    if (!DataTypes.IntegerType.sameType(widthType)
+        && !DataTypes.ShortType.sameType(widthType)
+        && !DataTypes.ByteType.sameType(widthType)) {
+      throw new UnsupportedOperationException(
+          "Expected truncation width to be tinyint, shortint or int");
+    }
+  }
+
+  @Override
+  public BoundFunction bind(StructType inputType) {
+    if (inputType.fields().length != 2) {
+      throw new UnsupportedOperationException(
+          "Cannot bind truncate: wrong number of inputs (expected width and value)");
+    }
+
+    StructField widthField = inputType.fields()[0];
+    StructField toTruncateField = inputType.fields()[1];
+    validateTruncationWidthType(widthField.dataType());
+
+    DataType toTruncateDataType = toTruncateField.dataType();
+    if (toTruncateDataType instanceof ByteType) {
+      return new TruncateTinyInt();
+    } else if (toTruncateDataType instanceof ShortType) {
+      return new TruncateSmallInt();
+    } else if (toTruncateDataType instanceof IntegerType) {
+      return new TruncateInt();
+    } else if (toTruncateDataType instanceof LongType) {
+      return new TruncateBigInt();
+    } else if (toTruncateDataType instanceof DecimalType) {
+      return new TruncateDecimal(
+          ((DecimalType) toTruncateDataType).precision(),
+          ((DecimalType) toTruncateDataType).scale());
+    } else if (toTruncateDataType instanceof StringType) {
+      return new TruncateString();
+    } else if (toTruncateDataType instanceof BinaryType) {
+      return new TruncateBinary();
+    } else {
+      throw new UnsupportedOperationException(
+          "Expected truncation col to be tinyint, shortint, int, bigint, decimal, string, or binary");
+    }
+  }
+
+  @Override
+  public String description() {
+    return name()
+        + "(width, col) - Call Iceberg's truncate transform\n"
+        + "  width :: width for truncation, e.g. truncate(10, 255) -> 250 (must be an integer)\n"
+        + "  col :: column to truncate (must be an integer, decimal, string, or binary)";
+  }
+
+  @Override
+  public String name() {
+    return "truncate";
+  }
+
+  public abstract static class TruncateBase<T> implements ScalarFunction<T> {
+    @Override
+    public String name() {
+      return "truncate";
+    }
+  }
+
+  public static class TruncateTinyInt extends TruncateBase<Byte> {
+    public static byte invoke(int width, byte value) {
+      return TruncateUtil.truncateByte(width, value);
+    }
+
+    @Override
+    public DataType[] inputTypes() {
+      return new DataType[] {DataTypes.IntegerType, DataTypes.ByteType};
+    }
+
+    @Override
+    public DataType resultType() {
+      return DataTypes.ByteType;
+    }
+
+    @Override
+    public String canonicalName() {
+      return "iceberg.truncate(tinyint)";
+    }
+
+    @Override
+    public Byte produceResult(InternalRow input) {
+      return input.isNullAt(0) || input.isNullAt(1)
+          ? null
+          : invoke(input.getInt(0), input.getByte(1));
+    }
+  }
+
+  public static class TruncateSmallInt extends TruncateBase<Short> {
+    // magic method used in codegen
+    public static short invoke(int width, short value) {
+      return TruncateUtil.truncateShort(width, value);
+    }
+
+    @Override
+    public DataType[] inputTypes() {
+      return new DataType[] {DataTypes.IntegerType, DataTypes.ShortType};
+    }
+
+    @Override
+    public DataType resultType() {
+      return DataTypes.ShortType;
+    }
+
+    @Override
+    public String canonicalName() {
+      return "iceberg.truncate(smallint)";
+    }
+
+    @Override
+    public Short produceResult(InternalRow input) {
+      return input.isNullAt(0) || input.isNullAt(1)

Review Comment:
   Do we actually allow nullable width?



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/functions/TruncateFunction.java:
##########
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.functions;
+
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import org.apache.iceberg.util.BinaryUtil;
+import org.apache.iceberg.util.ByteBuffers;
+import org.apache.iceberg.util.TruncateUtil;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.functions.BoundFunction;
+import org.apache.spark.sql.connector.catalog.functions.ScalarFunction;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
+import org.apache.spark.sql.types.BinaryType;
+import org.apache.spark.sql.types.ByteType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.sql.types.LongType;
+import org.apache.spark.sql.types.ShortType;
+import org.apache.spark.sql.types.StringType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A Spark function implementation for the Iceberg truncate transform.
+ *
+ * <p>Example usage: {@code SELECT system.truncate(1, 'abc')}, which returns the String 'a'.
+ *
+ * <p>Note that for performance reasons, the given input width is not validated in the
+ * implementations used in code-gen. The width must remain non-negative to give meaningful results.
+ */
+public class TruncateFunction implements UnboundFunction {
+
+  private static void validateTruncationWidthType(DataType widthType) {
+    if (!DataTypes.IntegerType.sameType(widthType)
+        && !DataTypes.ShortType.sameType(widthType)
+        && !DataTypes.ByteType.sameType(widthType)) {
+      throw new UnsupportedOperationException(
+          "Expected truncation width to be tinyint, shortint or int");
+    }
+  }
+
+  @Override
+  public BoundFunction bind(StructType inputType) {
+    if (inputType.fields().length != 2) {
+      throw new UnsupportedOperationException(
+          "Cannot bind truncate: wrong number of inputs (expected width and value)");
+    }
+
+    StructField widthField = inputType.fields()[0];
+    StructField toTruncateField = inputType.fields()[1];
+    validateTruncationWidthType(widthField.dataType());

Review Comment:
   Do we have to validate width nullability?



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/functions/TruncateFunction.java:
##########
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.functions;
+
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import org.apache.iceberg.util.BinaryUtil;
+import org.apache.iceberg.util.ByteBuffers;
+import org.apache.iceberg.util.TruncateUtil;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.functions.BoundFunction;
+import org.apache.spark.sql.connector.catalog.functions.ScalarFunction;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
+import org.apache.spark.sql.types.BinaryType;
+import org.apache.spark.sql.types.ByteType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.sql.types.LongType;
+import org.apache.spark.sql.types.ShortType;
+import org.apache.spark.sql.types.StringType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A Spark function implementation for the Iceberg truncate transform.
+ *
+ * <p>Example usage: {@code SELECT system.truncate(1, 'abc')}, which returns the String 'a'.
+ *
+ * <p>Note that for performance reasons, the given input width is not validated in the
+ * implementations used in code-gen. The width must remain non-negative to give meaningful results.
+ */
+public class TruncateFunction implements UnboundFunction {
+
+  private static void validateTruncationWidthType(DataType widthType) {
+    if (!DataTypes.IntegerType.sameType(widthType)
+        && !DataTypes.ShortType.sameType(widthType)
+        && !DataTypes.ByteType.sameType(widthType)) {
+      throw new UnsupportedOperationException(
+          "Expected truncation width to be tinyint, shortint or int");
+    }
+  }
+
+  @Override
+  public BoundFunction bind(StructType inputType) {
+    if (inputType.fields().length != 2) {
+      throw new UnsupportedOperationException(
+          "Cannot bind truncate: wrong number of inputs (expected width and value)");
+    }
+
+    StructField widthField = inputType.fields()[0];
+    StructField toTruncateField = inputType.fields()[1];
+    validateTruncationWidthType(widthField.dataType());

Review Comment:
   What do you think about adding an empty line before this call to separate the block that instantiates the fields from the validation?



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/functions/TruncateFunction.java:
##########
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.functions;
+
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import org.apache.iceberg.util.BinaryUtil;
+import org.apache.iceberg.util.ByteBuffers;
+import org.apache.iceberg.util.TruncateUtil;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.functions.BoundFunction;
+import org.apache.spark.sql.connector.catalog.functions.ScalarFunction;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
+import org.apache.spark.sql.types.BinaryType;
+import org.apache.spark.sql.types.ByteType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.sql.types.LongType;
+import org.apache.spark.sql.types.ShortType;
+import org.apache.spark.sql.types.StringType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A Spark function implementation for the Iceberg truncate transform.
+ *
+ * <p>Example usage: {@code SELECT system.truncate(1, 'abc')}, which returns the String 'a'.
+ *
+ * <p>Note that for performance reasons, the given input width is not validated in the
+ * implementations used in code-gen. The width must remain non-negative to give meaningful results.
+ */
+public class TruncateFunction implements UnboundFunction {
+
+  private static void validateTruncationWidthType(DataType widthType) {
+    if (!DataTypes.IntegerType.sameType(widthType)
+        && !DataTypes.ShortType.sameType(widthType)
+        && !DataTypes.ByteType.sameType(widthType)) {
+      throw new UnsupportedOperationException(
+          "Expected truncation width to be tinyint, shortint or int");
+    }
+  }
+
+  @Override
+  public BoundFunction bind(StructType inputType) {
+    if (inputType.fields().length != 2) {
+      throw new UnsupportedOperationException(
+          "Cannot bind truncate: wrong number of inputs (expected width and value)");
+    }
+
+    StructField widthField = inputType.fields()[0];
+    StructField toTruncateField = inputType.fields()[1];
+    validateTruncationWidthType(widthField.dataType());
+
+    DataType toTruncateDataType = toTruncateField.dataType();
+    if (toTruncateDataType instanceof ByteType) {
+      return new TruncateTinyInt();
+    } else if (toTruncateDataType instanceof ShortType) {
+      return new TruncateSmallInt();
+    } else if (toTruncateDataType instanceof IntegerType) {
+      return new TruncateInt();
+    } else if (toTruncateDataType instanceof LongType) {
+      return new TruncateBigInt();
+    } else if (toTruncateDataType instanceof DecimalType) {
+      return new TruncateDecimal(
+          ((DecimalType) toTruncateDataType).precision(),
+          ((DecimalType) toTruncateDataType).scale());
+    } else if (toTruncateDataType instanceof StringType) {
+      return new TruncateString();
+    } else if (toTruncateDataType instanceof BinaryType) {
+      return new TruncateBinary();
+    } else {
+      throw new UnsupportedOperationException(
+          "Expected truncation col to be tinyint, shortint, int, bigint, decimal, string, or binary");
+    }
+  }
+
+  @Override
+  public String description() {
+    return name()
+        + "(width, col) - Call Iceberg's truncate transform\n"
+        + "  width :: width for truncation, e.g. truncate(10, 255) -> 250 (must be an integer)\n"
+        + "  col :: column to truncate (must be an integer, decimal, string, or binary)";
+  }
+
+  @Override
+  public String name() {
+    return "truncate";
+  }
+
+  public abstract static class TruncateBase<T> implements ScalarFunction<T> {
+    @Override
+    public String name() {
+      return "truncate";
+    }
+  }
+
+  public static class TruncateTinyInt extends TruncateBase<Byte> {
+    public static byte invoke(int width, byte value) {
+      return TruncateUtil.truncateByte(width, value);
+    }
+
+    @Override
+    public DataType[] inputTypes() {
+      return new DataType[] {DataTypes.IntegerType, DataTypes.ByteType};
+    }
+
+    @Override
+    public DataType resultType() {
+      return DataTypes.ByteType;
+    }
+
+    @Override
+    public String canonicalName() {
+      return "iceberg.truncate(tinyint)";
+    }
+
+    @Override
+    public Byte produceResult(InternalRow input) {
+      return input.isNullAt(0) || input.isNullAt(1)
+          ? null
+          : invoke(input.getInt(0), input.getByte(1));
+    }
+  }
+
+  public static class TruncateSmallInt extends TruncateBase<Short> {
+    // magic method used in codegen
+    public static short invoke(int width, short value) {
+      return TruncateUtil.truncateShort(width, value);
+    }
+
+    @Override
+    public DataType[] inputTypes() {
+      return new DataType[] {DataTypes.IntegerType, DataTypes.ShortType};
+    }
+
+    @Override
+    public DataType resultType() {
+      return DataTypes.ShortType;
+    }
+
+    @Override
+    public String canonicalName() {
+      return "iceberg.truncate(smallint)";
+    }
+
+    @Override
+    public Short produceResult(InternalRow input) {
+      return input.isNullAt(0) || input.isNullAt(1)
+          ? null
+          : invoke(input.getInt(0), input.getShort(1));
+    }
+  }
+
+  public static class TruncateInt extends TruncateBase<Integer> {
+    // magic method used in codegen
+    public static int invoke(int width, int value) {
+      return TruncateUtil.truncateInt(width, value);
+    }
+
+    @Override
+    public DataType[] inputTypes() {
+      return new DataType[] {DataTypes.IntegerType, DataTypes.IntegerType};
+    }
+
+    @Override
+    public DataType resultType() {
+      return DataTypes.IntegerType;
+    }
+
+    @Override
+    public String canonicalName() {
+      return "iceberg.truncate(int)";
+    }
+
+    @Override
+    public Integer produceResult(InternalRow input) {
+      return input.isNullAt(0) || input.isNullAt(1)
+          ? null
+          : invoke(input.getInt(0), input.getInt(1));
+    }
+  }
+
+  public static class TruncateBigInt extends TruncateBase<Long> {
+    // magic function for usage with codegen
+    public static long invoke(int width, long value) {
+      return TruncateUtil.truncateLong(width, value);
+    }
+
+    @Override
+    public DataType[] inputTypes() {
+      return new DataType[] {DataTypes.IntegerType, DataTypes.LongType};
+    }
+
+    @Override
+    public DataType resultType() {
+      return DataTypes.LongType;
+    }
+
+    @Override
+    public String canonicalName() {
+      return "iceberg.truncate(bigint)";
+    }
+
+    @Override
+    public Long produceResult(InternalRow input) {
+      return input.isNullAt(0) || input.isNullAt(1)

Review Comment:
   I wonder whether explicit if statements would be more readable given that we have to split the ternary operation onto multiple lines.



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/functions/TruncateFunction.java:
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.functions;
+
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import org.apache.iceberg.util.BinaryUtil;
+import org.apache.iceberg.util.ByteBuffers;
+import org.apache.iceberg.util.TruncateUtil;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.functions.BoundFunction;
+import org.apache.spark.sql.connector.catalog.functions.ScalarFunction;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
+import org.apache.spark.sql.types.BinaryType;
+import org.apache.spark.sql.types.ByteType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.sql.types.LongType;
+import org.apache.spark.sql.types.ShortType;
+import org.apache.spark.sql.types.StringType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A Spark function implementation for the Iceberg truncate transform.
+ *
+ * <p>Example usage: {@code SELECT system.truncate(1, 'abc')}, which returns the String 'a'.
+ *
+ * <p>Note that for performance reasons, the given input width is not validated in the
+ * implementations used in code-gen. The width must remain non-negative to give meaningful results.
+ */
+public class TruncateFunction implements UnboundFunction {
+
+  private static void validateTruncationWidthType(DataType widthType) {
+    if (!DataTypes.IntegerType.sameType(widthType)
+        && !DataTypes.ShortType.sameType(widthType)
+        && !DataTypes.ByteType.sameType(widthType)) {
+      throw new UnsupportedOperationException(
+          "Expected truncation width to be one of [ByteType, ShortType, IntegerType], but found "
+              + widthType);
+    }
+  }
+
+  @Override
+  public BoundFunction bind(StructType inputType) {
+    if (inputType.fields().length != 2) {
+      throw new UnsupportedOperationException(
+          "Cannot bind truncate: wrong number of inputs (expected width and value)");
+    }
+
+    StructField widthField = inputType.fields()[0];
+    StructField toTruncateField = inputType.fields()[1];
+    validateTruncationWidthType(widthField.dataType());
+
+    DataType toTruncateDataType = toTruncateField.dataType();
+    if (toTruncateDataType instanceof ByteType) {
+      return new TruncateTinyInt();
+    } else if (toTruncateDataType instanceof ShortType) {
+      return new TruncateSmallInt();
+    } else if (toTruncateDataType instanceof IntegerType) {
+      return new TruncateInt();
+    } else if (toTruncateDataType instanceof LongType) {
+      return new TruncateBigInt();
+    } else if (toTruncateDataType instanceof DecimalType) {
+      return new TruncateDecimal(
+          ((DecimalType) toTruncateDataType).precision(),
+          ((DecimalType) toTruncateDataType).scale());
+    } else if (toTruncateDataType instanceof StringType) {
+      return new TruncateString();
+    } else if (toTruncateDataType instanceof BinaryType) {
+      return new TruncateBinary();
+    } else {
+      throw new UnsupportedOperationException(
+          String.format(
+              "Invalid input type to truncate. Expected one of [ByteType, ShortType, IntegerType, LongType, "
+                  + "DecimalType, StringType, BinaryType], but found %s",
+              toTruncateDataType));
+    }
+  }
+
+  @Override
+  public String description() {
+    return name()
+        + "(width, col) - Call Iceberg's truncate transform\n"
+        + "  width :: width for truncation, e.g. truncate(10, 255) -> 250 (must be an integer)\n"
+        + "  col :: column to truncate (must be an integer, decimal, string, or binary)";
+  }
+
+  @Override
+  public String name() {
+    return "truncate";
+  }
+
+  public abstract static class TruncateBase<T> implements ScalarFunction<T> {
+    @Override
+    public String name() {
+      return "truncate";
+    }
+  }
+
+  public static class TruncateTinyInt extends TruncateBase<Byte> {
+    public static byte invoke(int width, byte value) {
+      return TruncateUtil.truncateByte(width, value);
+    }
+
+    @Override
+    public DataType[] inputTypes() {
+      return new DataType[] {DataTypes.IntegerType, DataTypes.ByteType};
+    }
+
+    @Override
+    public DataType resultType() {
+      return DataTypes.ByteType;
+    }
+
+    @Override
+    public String canonicalName() {
+      return "iceberg.truncate(tinyint)";
+    }
+
+    @Override
+    public Byte produceResult(InternalRow input) {
+      int width = readWidth(input);
+      return input.isNullAt(1) ? null : invoke(width, input.getByte(1));
+    }
+  }
+
+  public static class TruncateSmallInt extends TruncateBase<Short> {
+    // magic method used in codegen
+    public static short invoke(int width, short value) {
+      return TruncateUtil.truncateShort(width, value);
+    }
+
+    @Override
+    public DataType[] inputTypes() {
+      return new DataType[] {DataTypes.IntegerType, DataTypes.ShortType};
+    }
+
+    @Override
+    public DataType resultType() {
+      return DataTypes.ShortType;
+    }
+
+    @Override
+    public String canonicalName() {
+      return "iceberg.truncate(smallint)";
+    }
+
+    @Override
+    public Short produceResult(InternalRow input) {
+      int width = readWidth(input);
+      return input.isNullAt(1) ? null : invoke(width, input.getShort(1));
+    }
+  }
+
+  public static class TruncateInt extends TruncateBase<Integer> {
+    // magic method used in codegen
+    public static int invoke(int width, int value) {
+      return TruncateUtil.truncateInt(width, value);
+    }
+
+    @Override
+    public DataType[] inputTypes() {
+      return new DataType[] {DataTypes.IntegerType, DataTypes.IntegerType};
+    }
+
+    @Override
+    public DataType resultType() {
+      return DataTypes.IntegerType;
+    }
+
+    @Override
+    public String canonicalName() {
+      return "iceberg.truncate(int)";
+    }
+
+    @Override
+    public Integer produceResult(InternalRow input) {
+      int width = readWidth(input);
+      return input.isNullAt(1) ? null : invoke(width, input.getInt(1));
+    }
+  }
+
+  public static class TruncateBigInt extends TruncateBase<Long> {
+    // magic function for usage with codegen
+    public static long invoke(int width, long value) {
+      return TruncateUtil.truncateLong(width, value);
+    }
+
+    @Override
+    public DataType[] inputTypes() {
+      return new DataType[] {DataTypes.IntegerType, DataTypes.LongType};
+    }
+
+    @Override
+    public DataType resultType() {
+      return DataTypes.LongType;
+    }
+
+    @Override
+    public String canonicalName() {
+      return "iceberg.truncate(bigint)";
+    }
+
+    @Override
+    public Long produceResult(InternalRow input) {
+      int width = readWidth(input);
+      return input.isNullAt(1) ? null : invoke(width, input.getLong(1));
+    }
+  }
+
+  public static class TruncateString extends TruncateBase<UTF8String> {
+    // magic function for usage with codegen
+    public static UTF8String invoke(int width, UTF8String value) {
+      if (value == null) {
+        return null;
+      }
+
+      return value.substring(0, width);

Review Comment:
   I'd be up to using UTF8String directly. Do we have enough tests to verify the bevahior of the Spark function will match the behavior of Iceberg transform?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org