You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/08/12 17:25:19 UTC

[GitHub] [iceberg] kbendick opened a new pull request, #5513: Spark - Add Buket Function for FunctionCatalog

kbendick opened a new pull request, #5513:
URL: https://github.com/apache/iceberg/pull/5513

   Adds a `bucket` function that performs the Iceberg partition transformation to the Spark FunctionCatalog, for usage from SQL and for usage with storage partitioned joins.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5513: Spark 3.3 - Support bucket in FunctionCatalog

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5513:
URL: https://github.com/apache/iceberg/pull/5513#discussion_r945206541


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/functions/BucketFunction.java:
##########
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.functions;
+
+import java.nio.ByteBuffer;
+import java.util.Set;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.util.BucketHashUtil;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.functions.BoundFunction;
+import org.apache.spark.sql.connector.catalog.functions.ScalarFunction;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
+import org.apache.spark.sql.types.BinaryType;
+import org.apache.spark.sql.types.ByteType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.DateType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.sql.types.LongType;
+import org.apache.spark.sql.types.ShortType;
+import org.apache.spark.sql.types.StringType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.TimestampType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A Spark function implementation for the Iceberg bucket transform.
+ *
+ * <p>Example usage: {@code SELECT system.bucket(1, 'abc')}, which returns the bucket.
+ *
+ * <p>Note that for performance reasons, the given input number of buckets is not validated in the
+ * implementations used in code-gen. The number of buckets must be positive to give meaningful
+ * results.
+ */
+public class BucketFunction implements UnboundFunction {
+  private static final int NUM_BUCKETS_ORDINAL = 0;
+  private static final int VALUE_ORDINAL = 1;
+  private static final Set<DataType> SUPPORTED_NUM_BUCKETS_TYPES =
+      ImmutableSet.of(DataTypes.ByteType, DataTypes.ShortType, DataTypes.IntegerType);
+
+  @Override
+  public BoundFunction bind(StructType inputType) {
+    if (inputType.size() != 2) {
+      throw new UnsupportedOperationException(
+          "Wrong number of inputs (expected numBuckets and value)");
+    }
+
+    StructField numBucketsField = inputType.fields()[NUM_BUCKETS_ORDINAL];
+    StructField valueField = inputType.fields()[VALUE_ORDINAL];
+
+    if (!SUPPORTED_NUM_BUCKETS_TYPES.contains(numBucketsField.dataType())) {
+      throw new UnsupportedOperationException(
+          "Expected number of buckets to be tinyint, shortint or int");
+    }
+
+    DataType type = valueField.dataType();
+    if (type instanceof DateType) {
+      return new BucketInt(DataTypes.DateType);
+    } else if (type instanceof ByteType
+        || type instanceof ShortType
+        || type instanceof IntegerType) {
+      return new BucketInt(DataTypes.IntegerType);
+    } else if (type instanceof LongType) {
+      return new BucketLong(DataTypes.LongType);
+    } else if (type instanceof TimestampType) {
+      return new BucketLong(DataTypes.TimestampType);
+    } else if (type instanceof DecimalType) {
+      return new BucketDecimal(((DecimalType) type).precision(), ((DecimalType) type).scale());
+    } else if (type instanceof StringType) {
+      return new BucketString();
+    } else if (type instanceof BinaryType) {
+      return new BucketBinary();
+    } else {
+      throw new UnsupportedOperationException(
+          "Expected bucketed column to be date, tinyint, smallint, int, bigint, decimal, timestamp, string, or binary");
+    }
+  }
+
+  @Override
+  public String description() {
+    return name()
+        + "(numBuckets, col) - Call Iceberg's bucket transform\n"
+        + "  numBuckets :: number of buckets to divide the rows into, e.g. bucket(100, 34) -> 79 (must be a tinyint, smallint, or int)\n"
+        + "  col :: column to bucket (must be a date, integer, long, timestamp, decimal, string, or binary)";
+  }
+
+  @Override
+  public String name() {
+    return "bucket";
+  }
+
+  public abstract static class BucketBase implements ScalarFunction<Integer> {
+    @Override
+    public String name() {
+      return "bucket";
+    }
+
+    @Override
+    public DataType resultType() {
+      return DataTypes.IntegerType;
+    }
+  }
+
+  // Used for both int and date - tinyint and smallint are upcasted to int by Spark.
+  public static class BucketInt extends BucketBase {
+    private final DataType sqlType;
+
+    // magic method used in codegen
+    public static int invoke(int numBuckets, int value) {
+      return (BucketHashUtil.forInteger(value) & Integer.MAX_VALUE) % numBuckets;
+    }
+
+    public BucketInt(DataType sqlType) {
+      this.sqlType = sqlType;
+    }
+
+    @Override
+    public Integer produceResult(InternalRow input) {
+      // return null for null input to match what Spark does in the code-generated versions.
+      return input.isNullAt(NUM_BUCKETS_ORDINAL) || input.isNullAt(VALUE_ORDINAL)
+          ? null
+          : invoke(input.getInt(NUM_BUCKETS_ORDINAL), input.getInt(VALUE_ORDINAL));
+    }
+
+    @Override
+    public DataType[] inputTypes() {
+      return new DataType[] {DataTypes.IntegerType, sqlType};
+    }
+
+    @Override
+    public String canonicalName() {
+      return String.format("iceberg.bucket(%s)", sqlType.catalogString());
+    }
+  }
+
+  // Used for both BigInt and Timestamp
+  public static class BucketLong extends BucketBase {
+    private final DataType sqlType;
+
+    // magic function for usage with codegen - needs to be static
+    public static int invoke(int numBuckets, long value) {
+      return (BucketHashUtil.forLong(value) & Integer.MAX_VALUE) % numBuckets;
+    }
+
+    public BucketLong(DataType sqlType) {
+      this.sqlType = sqlType;
+    }
+
+    @Override
+    public DataType[] inputTypes() {
+      return new DataType[] {DataTypes.IntegerType, sqlType};
+    }
+
+    @Override
+    public Integer produceResult(InternalRow input) {
+      // return null for null input to match what Spark does in the code-generated versions.
+      return input.isNullAt(NUM_BUCKETS_ORDINAL) || input.isNullAt(VALUE_ORDINAL)
+          ? null
+          : invoke(input.getInt(NUM_BUCKETS_ORDINAL), input.getLong(VALUE_ORDINAL));
+    }
+
+    @Override
+    public String canonicalName() {
+      return String.format("iceberg.bucket(%s)", sqlType.catalogString());
+    }
+  }
+
+  // bucketing by Float is not allowed by the spec, but this has the float hash implementation
+  public static class BucketFloat extends BucketBase {

Review Comment:
   Yeah, please remove this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a diff in pull request #5513: Spark 3.3 - Support bucket in FunctionCatalog

Posted by GitBox <gi...@apache.org>.
kbendick commented on code in PR #5513:
URL: https://github.com/apache/iceberg/pull/5513#discussion_r947194009


##########
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkBucketFunction.java:
##########
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.sql;
+
+import java.nio.charset.StandardCharsets;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.relocated.com.google.common.io.BaseEncoding;
+import org.apache.iceberg.spark.SparkTestBaseWithCatalog;
+import org.apache.spark.sql.AnalysisException;
+import org.assertj.core.api.Assertions;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestSparkBucketFunction extends SparkTestBaseWithCatalog {
+  public TestSparkBucketFunction() {}
+
+  @Before
+  public void useCatalog() {
+    sql("USE %s", catalogName);
+  }
+
+  @Test
+  public void testBucketIntegers() {
+    Assert.assertEquals(
+        "Byte type should bucket similarly to integer",
+        3,
+        scalarSql("SELECT system.bucket(10, 8Y)"));
+    Assert.assertEquals(
+        "Short type should bucket similarly to integer",
+        3,
+        scalarSql("SELECT system.bucket(10, 8S)"));
+    // Integers
+    Assert.assertEquals(3, scalarSql("SELECT system.bucket(10, 8)"));
+    Assert.assertEquals(79, scalarSql("SELECT system.bucket(100, 34)"));
+    Assert.assertNull(scalarSql("SELECT system.bucket(1, CAST(null AS INT))"));
+  }
+
+  @Test
+  public void testBucketDates() {
+    Assert.assertEquals(3, scalarSql("SELECT system.bucket(10, date('1970-01-09'))"));
+    Assert.assertEquals(79, scalarSql("SELECT system.bucket(100, date('1970-02-04'))"));
+    Assert.assertNull(scalarSql("SELECT system.bucket(1, CAST(null AS DATE))"));
+  }
+
+  @Test
+  public void testBucketLong() {
+    Assert.assertEquals(79, scalarSql("SELECT system.bucket(100, 34L)"));
+    Assert.assertEquals(76, scalarSql("SELECT system.bucket(100, 0L)"));
+    Assert.assertEquals(97, scalarSql("SELECT system.bucket(100, -34L)"));
+    Assert.assertEquals(0, scalarSql("SELECT system.bucket(2, -1L)"));
+    Assert.assertNull(scalarSql("SELECT system.bucket(2, CAST(null AS LONG))"));
+  }
+
+  @Test
+  public void testBucketDecimal() {
+    Assert.assertEquals(56, scalarSql("SELECT system.bucket(64, CAST('12.34' as DECIMAL(9, 2)))"));
+    Assert.assertEquals(13, scalarSql("SELECT system.bucket(18, CAST('12.30' as DECIMAL(9, 2)))"));
+    Assert.assertEquals(2, scalarSql("SELECT system.bucket(16, CAST('12.999' as DECIMAL(9, 3)))"));
+    Assert.assertEquals(21, scalarSql("SELECT system.bucket(32, CAST('0.05' as DECIMAL(5, 2)))"));
+    Assert.assertEquals(85, scalarSql("SELECT system.bucket(128, CAST('0.05' as DECIMAL(9, 2)))"));
+    Assert.assertEquals(3, scalarSql("SELECT system.bucket(18, CAST('0.05' as DECIMAL(9, 2)))"));
+
+    Assert.assertNull(
+        "Null input should return null",
+        scalarSql("SELECT system.bucket(2, CAST(null AS decimal))"));
+  }
+
+  @Test
+  public void testBucketTimestamp() {
+    Assert.assertEquals(
+        99, scalarSql("SELECT system.bucket(100, TIMESTAMP '1997-01-01 00:00:00 UTC+00:00')"));
+    Assert.assertEquals(
+        85, scalarSql("SELECT system.bucket(100, TIMESTAMP '1997-01-31 09:26:56 UTC+00:00')"));
+    Assert.assertEquals(
+        62, scalarSql("SELECT system.bucket(100, TIMESTAMP '2022-08-08 00:00:00 UTC+00:00')"));
+    Assert.assertNull(scalarSql("SELECT system.bucket(2, CAST(null AS timestamp))"));
+  }
+
+  @Test
+  public void testBucketString() {
+    Assert.assertEquals(4, scalarSql("SELECT system.bucket(5, 'abcdefg')"));
+    Assert.assertEquals(122, scalarSql("SELECT system.bucket(128, 'abc')"));
+    Assert.assertEquals(54, scalarSql("SELECT system.bucket(64, 'abcde')"));
+    Assert.assertEquals(8, scalarSql("SELECT system.bucket(12, '测试')"));
+    Assert.assertEquals(1, scalarSql("SELECT system.bucket(16, '测试raul试测')"));
+    Assert.assertEquals(
+        "Varchar should work like string",
+        1,
+        scalarSql("SELECT system.bucket(16, CAST('测试raul试测' AS varchar(8)))"));
+    Assert.assertEquals(
+        "Char should work like string",
+        1,
+        scalarSql("SELECT system.bucket(16, CAST('测试raul试测' AS char(8)))"));
+    Assert.assertEquals(
+        "Should not fail on the empty string", 0, scalarSql("SELECT system.bucket(16, '')"));
+    Assert.assertNull(
+        "Null input should return null as output",
+        scalarSql("SELECT system.bucket(16, CAST(null AS string))"));
+  }
+
+  @Test
+  public void testBucketBinary() {
+    Assert.assertEquals(
+        1, scalarSql("SELECT system.bucket(10, X'0102030405060708090a0b0c0d0e0f')"));
+    Assert.assertEquals(10, scalarSql("SELECT system.bucket(12, %s)", asBytesLiteral("abcdefg")));
+    Assert.assertEquals(13, scalarSql("SELECT system.bucket(18, %s)", asBytesLiteral("abc\0\0")));
+    Assert.assertEquals(42, scalarSql("SELECT system.bucket(48, %s)", asBytesLiteral("abc")));
+    Assert.assertEquals(3, scalarSql("SELECT system.bucket(16, %s)", asBytesLiteral("测试_")));

Review Comment:
   Correction. The bucket function test suite uses a seeded random to generate data.
   
   I did however test the output of all of the values from the spec against their hashed output. Let me know if that's sufficient or if I should update these to be against the `hash` output (or both).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a diff in pull request #5513: Spark 3.3 - Support bucket in FunctionCatalog

Posted by GitBox <gi...@apache.org>.
kbendick commented on code in PR #5513:
URL: https://github.com/apache/iceberg/pull/5513#discussion_r944968515


##########
api/src/main/java/org/apache/iceberg/util/BucketHashUtil.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.util;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.UUID;
+import org.apache.iceberg.relocated.com.google.common.hash.HashFunction;
+import org.apache.iceberg.relocated.com.google.common.hash.Hashing;
+
+/**
+ * Contains the logic for hashing various types for use with the {@code bucket} partition
+ * transformations
+ */
+public class BucketHashUtil {
+
+  private static final HashFunction MURMUR3 = Hashing.murmur3_32_fixed();
+
+  private BucketHashUtil() {}
+
+  public static int forInteger(Integer value) {

Review Comment:
   Renamed them to `hashXXXX`, e.g. `hashInteger` etc.



##########
api/src/main/java/org/apache/iceberg/util/BucketHashUtil.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.util;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.UUID;
+import org.apache.iceberg.relocated.com.google.common.hash.HashFunction;
+import org.apache.iceberg.relocated.com.google.common.hash.Hashing;
+
+/**
+ * Contains the logic for hashing various types for use with the {@code bucket} partition
+ * transformations
+ */
+public class BucketHashUtil {

Review Comment:
   Renamed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a diff in pull request #5513: Spark 3.3 - Support bucket in FunctionCatalog

Posted by GitBox <gi...@apache.org>.
kbendick commented on code in PR #5513:
URL: https://github.com/apache/iceberg/pull/5513#discussion_r946026125


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/functions/BucketFunction.java:
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.functions;
+
+import java.nio.ByteBuffer;
+import java.util.Set;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.util.BucketUtil;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.functions.BoundFunction;
+import org.apache.spark.sql.connector.catalog.functions.ScalarFunction;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
+import org.apache.spark.sql.types.BinaryType;
+import org.apache.spark.sql.types.ByteType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.DateType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.sql.types.LongType;
+import org.apache.spark.sql.types.ShortType;
+import org.apache.spark.sql.types.StringType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.TimestampType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A Spark function implementation for the Iceberg bucket transform.
+ *
+ * <p>Example usage: {@code SELECT system.bucket(128, 'abc')}, which returns the bucket 122.
+ *
+ * <p>Note that for performance reasons, the given input number of buckets is not validated in the
+ * implementations used in code-gen. The number of buckets must be positive to give meaningful
+ * results.
+ */
+public class BucketFunction implements UnboundFunction {
+  private static final int NUM_BUCKETS_ORDINAL = 0;
+  private static final int VALUE_ORDINAL = 1;
+  private static final Set<DataType> SUPPORTED_NUM_BUCKETS_TYPES =
+      ImmutableSet.of(DataTypes.ByteType, DataTypes.ShortType, DataTypes.IntegerType);
+
+  @Override
+  public BoundFunction bind(StructType inputType) {
+    if (inputType.size() != 2) {
+      throw new UnsupportedOperationException(
+          "Wrong number of inputs (expected numBuckets and value)");
+    }
+
+    StructField numBucketsField = inputType.fields()[NUM_BUCKETS_ORDINAL];
+    StructField valueField = inputType.fields()[VALUE_ORDINAL];
+
+    if (!SUPPORTED_NUM_BUCKETS_TYPES.contains(numBucketsField.dataType())) {
+      throw new UnsupportedOperationException(
+          "Expected number of buckets to be tinyint, shortint or int");
+    }
+
+    // TODO - Support UUID type.
+    DataType type = valueField.dataType();
+    if (type instanceof DateType) {
+      return new BucketInt(DataTypes.DateType);
+    } else if (type instanceof ByteType
+        || type instanceof ShortType
+        || type instanceof IntegerType) {
+      return new BucketInt(DataTypes.IntegerType);
+    } else if (type instanceof LongType) {
+      return new BucketLong(DataTypes.LongType);
+    } else if (type instanceof TimestampType) {
+      return new BucketLong(DataTypes.TimestampType);
+    } else if (type instanceof DecimalType) {
+      return new BucketDecimal(((DecimalType) type).precision(), ((DecimalType) type).scale());
+    } else if (type instanceof StringType) {
+      return new BucketString();
+    } else if (type instanceof BinaryType) {
+      return new BucketBinary();
+    } else {
+      throw new UnsupportedOperationException(
+          "Expected bucketed column to be date, tinyint, smallint, int, bigint, decimal, timestamp, string, or binary");
+    }
+  }
+
+  @Override
+  public String description() {
+    return name()
+        + "(numBuckets, col) - Call Iceberg's bucket transform\n"
+        + "  numBuckets :: number of buckets to divide the rows into, e.g. bucket(100, 34) -> 79 (must be a tinyint, smallint, or int)\n"
+        + "  col :: column to bucket (must be a date, integer, long, timestamp, decimal, string, or binary)";
+  }
+
+  @Override
+  public String name() {
+    return "bucket";
+  }
+
+  public abstract static class BucketBase implements ScalarFunction<Integer> {
+    @Override
+    public String name() {
+      return "bucket";
+    }
+
+    @Override
+    public DataType resultType() {
+      return DataTypes.IntegerType;
+    }
+  }
+
+  // Used for both int and date - tinyint and smallint are upcasted to int by Spark.
+  public static class BucketInt extends BucketBase {
+    private final DataType sqlType;
+
+    // magic method used in codegen
+    public static int invoke(int numBuckets, int value) {
+      return (BucketUtil.hashInteger(value) & Integer.MAX_VALUE) % numBuckets;
+    }
+
+    public BucketInt(DataType sqlType) {
+      this.sqlType = sqlType;
+    }
+
+    @Override
+    public DataType[] inputTypes() {
+      return new DataType[] {DataTypes.IntegerType, sqlType};
+    }
+
+    @Override
+    public String canonicalName() {
+      return String.format("iceberg.bucket(%s)", sqlType.catalogString());
+    }
+
+    @Override
+    public Integer produceResult(InternalRow input) {
+      // return null for null input to match what Spark does in the code-generated versions.
+      return input.isNullAt(NUM_BUCKETS_ORDINAL) || input.isNullAt(VALUE_ORDINAL)
+          ? null
+          : invoke(input.getInt(NUM_BUCKETS_ORDINAL), input.getInt(VALUE_ORDINAL));
+    }
+  }
+
+  // Used for both BigInt and Timestamp
+  public static class BucketLong extends BucketBase {
+    private final DataType sqlType;
+
+    // magic function for usage with codegen - needs to be static
+    public static int invoke(int numBuckets, long value) {
+      return (BucketUtil.hashLong(value) & Integer.MAX_VALUE) % numBuckets;
+    }
+
+    public BucketLong(DataType sqlType) {
+      this.sqlType = sqlType;
+    }
+
+    @Override
+    public DataType[] inputTypes() {
+      return new DataType[] {DataTypes.IntegerType, sqlType};
+    }
+
+    @Override
+    public String canonicalName() {
+      return String.format("iceberg.bucket(%s)", sqlType.catalogString());
+    }
+
+    @Override
+    public Integer produceResult(InternalRow input) {
+      return input.isNullAt(NUM_BUCKETS_ORDINAL) || input.isNullAt(VALUE_ORDINAL)
+          ? null
+          : invoke(input.getInt(NUM_BUCKETS_ORDINAL), input.getLong(VALUE_ORDINAL));
+    }
+  }
+
+  // bucketing by Float is not allowed by the spec, but this has the float hash implementation
+  public static class BucketFloat extends BucketBase {
+
+    public static int invoke(int numBuckets, float value) {
+      return (BucketUtil.hashFloat(value) & Integer.MAX_VALUE) % numBuckets;
+    }
+
+    @Override
+    public DataType[] inputTypes() {
+      return new DataType[] {DataTypes.IntegerType, DataTypes.FloatType};
+    }
+
+    @Override
+    public String canonicalName() {
+      return "iceberg.bucket(float)";
+    }
+
+    @Override
+    public Integer produceResult(InternalRow input) {
+      return input.isNullAt(NUM_BUCKETS_ORDINAL) || input.isNullAt(VALUE_ORDINAL)
+          ? null
+          : invoke(input.getInt(NUM_BUCKETS_ORDINAL), input.getFloat(VALUE_ORDINAL));
+    }
+  }
+
+  public static class BucketString extends BucketBase {
+    // magic function for usage with codegen
+    public static Integer invoke(int numBuckets, UTF8String value) {
+      if (value == null) {
+        return null;
+      }
+
+      return (BucketUtil.hashCharSequence(value.toString()) & Integer.MAX_VALUE) % numBuckets;
+    }
+
+    @Override
+    public DataType[] inputTypes() {
+      return new DataType[] {DataTypes.IntegerType, DataTypes.StringType};
+    }
+
+    @Override
+    public String canonicalName() {
+      return "iceberg.bucket(string)";
+    }
+
+    @Override
+    public Integer produceResult(InternalRow input) {
+      return input.isNullAt(NUM_BUCKETS_ORDINAL) || input.isNullAt(VALUE_ORDINAL)
+          ? null
+          : invoke(input.getInt(NUM_BUCKETS_ORDINAL), input.getUTF8String(VALUE_ORDINAL));
+    }
+  }
+
+  public static class BucketBinary extends BucketBase {
+    public static Integer invoke(int numBuckets, byte[] value) {
+      if (value == null) {
+        return null;
+      }
+
+      return (BucketUtil.hashByteBuffer(ByteBuffer.wrap(value)) & Integer.MAX_VALUE) % numBuckets;
+    }
+
+    @Override
+    public DataType[] inputTypes() {
+      return new DataType[] {DataTypes.IntegerType, DataTypes.BinaryType};
+    }
+
+    @Override
+    public Integer produceResult(InternalRow input) {
+      return input.isNullAt(NUM_BUCKETS_ORDINAL) || input.isNullAt(VALUE_ORDINAL)
+          ? null
+          : invoke(input.getInt(NUM_BUCKETS_ORDINAL), input.getBinary(VALUE_ORDINAL));
+    }
+
+    @Override
+    public String canonicalName() {
+      return "iceberg.bucket(binary)";
+    }
+  }
+
+  public static class BucketDecimal extends BucketBase {
+    private final int precision;
+    private final int scale;
+
+    // magic method used in codegen
+    public static Integer invoke(int numBuckets, Decimal value) {
+      if (value == null) {
+        return null;
+      }
+
+      return (BucketUtil.hashDecimal(value.toJavaBigDecimal()) & Integer.MAX_VALUE) % numBuckets;
+    }
+
+    public BucketDecimal(int precision, int scale) {

Review Comment:
   The hash function is indeed independent of `precision` and `scale`. I've updated the canonical name.
   
   But we do also need the `precision` and `scale` to get the `Decimal` value in `produceResult`. And it seems we need it for creating the correct `inputType` as mentioned. Let me see about not passing those in and passing in the spark type instead. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a diff in pull request #5513: Spark 3.3 - Support bucket in FunctionCatalog

Posted by GitBox <gi...@apache.org>.
kbendick commented on code in PR #5513:
URL: https://github.com/apache/iceberg/pull/5513#discussion_r947187694


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/functions/BucketFunction.java:
##########
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.functions;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.Set;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.util.BucketUtil;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.functions.BoundFunction;
+import org.apache.spark.sql.connector.catalog.functions.ScalarFunction;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
+import org.apache.spark.sql.types.BinaryType;
+import org.apache.spark.sql.types.ByteType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.DateType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.sql.types.LongType;
+import org.apache.spark.sql.types.ShortType;
+import org.apache.spark.sql.types.StringType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.TimestampType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A Spark function implementation for the Iceberg bucket transform.
+ *
+ * <p>Example usage: {@code SELECT system.bucket(128, 'abc')}, which returns the bucket 122.
+ *
+ * <p>Note that for performance reasons, the given input number of buckets is not validated in the
+ * implementations used in code-gen. The number of buckets must be positive to give meaningful
+ * results.
+ */
+public class BucketFunction implements UnboundFunction {
+  private static final int NUM_BUCKETS_ORDINAL = 0;
+  private static final int VALUE_ORDINAL = 1;
+  private static final Set<DataType> SUPPORTED_NUM_BUCKETS_TYPES =
+      ImmutableSet.of(DataTypes.ByteType, DataTypes.ShortType, DataTypes.IntegerType);
+
+  @Override
+  public BoundFunction bind(StructType inputType) {
+    if (inputType.size() != 2) {
+      throw new UnsupportedOperationException(
+          "Wrong number of inputs (expected numBuckets and value)");
+    }
+
+    StructField numBucketsField = inputType.fields()[NUM_BUCKETS_ORDINAL];
+    StructField valueField = inputType.fields()[VALUE_ORDINAL];
+
+    if (!SUPPORTED_NUM_BUCKETS_TYPES.contains(numBucketsField.dataType())) {
+      throw new UnsupportedOperationException(
+          "Expected number of buckets to be tinyint, shortint or int");
+    }
+
+    DataType type = valueField.dataType();
+    if (type instanceof DateType) {
+      return new BucketInt(type);
+    } else if (type instanceof ByteType
+        || type instanceof ShortType
+        || type instanceof IntegerType) {
+      return new BucketInt(DataTypes.IntegerType);
+    } else if (type instanceof LongType) {
+      return new BucketLong(type);
+    } else if (type instanceof TimestampType) {
+      return new BucketLong(type);
+    } else if (type instanceof DecimalType) {
+      return new BucketDecimal(type);
+    } else if (type instanceof StringType) {
+      return new BucketString();
+    } else if (type instanceof BinaryType) {
+      return new BucketBinary();
+    } else {
+      throw new UnsupportedOperationException(
+          "Expected column to be date, tinyint, smallint, int, bigint, decimal, timestamp, string, or binary");
+    }
+  }
+
+  @Override
+  public String description() {
+    return name()
+        + "(numBuckets, col) - Call Iceberg's bucket transform\n"
+        + "  numBuckets :: number of buckets to divide the rows into, e.g. bucket(100, 34) -> 79 (must be a tinyint, smallint, or int)\n"
+        + "  col :: column to bucket (must be a date, integer, long, timestamp, decimal, string, or binary)";
+  }
+
+  @Override
+  public String name() {
+    return "bucket";
+  }
+
+  public abstract static class BucketBase implements ScalarFunction<Integer> {
+    public static int apply(int numBuckets, int hashedValue) {
+      return (hashedValue & Integer.MAX_VALUE) % numBuckets;
+    }
+
+    @Override
+    public String name() {
+      return "bucket";
+    }
+
+    @Override
+    public DataType resultType() {
+      return DataTypes.IntegerType;
+    }
+  }
+
+  // Used for both int and date - tinyint and smallint are upcasted to int by Spark.
+  public static class BucketInt extends BucketBase {
+    private final DataType sqlType;
+
+    // magic method used in codegen
+    public static int invoke(int numBuckets, int value) {
+      return apply(numBuckets, hash(value));
+    }
+
+    // Visible for testing
+    public static int hash(int value) {
+      return BucketUtil.hash(value);
+    }
+
+    public BucketInt(DataType sqlType) {
+      this.sqlType = sqlType;
+    }
+
+    @Override
+    public DataType[] inputTypes() {
+      return new DataType[] {DataTypes.IntegerType, sqlType};
+    }
+
+    @Override
+    public String canonicalName() {
+      return String.format("iceberg.bucket(%s)", sqlType.catalogString());
+    }
+
+    @Override
+    public Integer produceResult(InternalRow input) {
+      // return null for null input to match what Spark does in the code-generated versions.
+      return input.isNullAt(NUM_BUCKETS_ORDINAL) || input.isNullAt(VALUE_ORDINAL)
+          ? null
+          : invoke(input.getInt(NUM_BUCKETS_ORDINAL), input.getInt(VALUE_ORDINAL));
+    }
+  }
+
+  // Used for both BigInt and Timestamp
+  public static class BucketLong extends BucketBase {
+    private final DataType sqlType;
+
+    // magic function for usage with codegen - needs to be static
+    public static int invoke(int numBuckets, long value) {
+      return apply(numBuckets, hash(value));
+    }
+
+    // Visible for testing
+    public static int hash(long value) {
+      return BucketUtil.hash(value);
+    }
+
+    public BucketLong(DataType sqlType) {
+      this.sqlType = sqlType;
+    }
+
+    @Override
+    public DataType[] inputTypes() {
+      return new DataType[] {DataTypes.IntegerType, sqlType};
+    }
+
+    @Override
+    public String canonicalName() {
+      return String.format("iceberg.bucket(%s)", sqlType.catalogString());
+    }
+
+    @Override
+    public Integer produceResult(InternalRow input) {
+      return input.isNullAt(NUM_BUCKETS_ORDINAL) || input.isNullAt(VALUE_ORDINAL)
+          ? null
+          : invoke(input.getInt(NUM_BUCKETS_ORDINAL), input.getLong(VALUE_ORDINAL));
+    }
+  }
+
+  public static class BucketString extends BucketBase {
+    // magic function for usage with codegen
+    public static Integer invoke(int numBuckets, UTF8String value) {
+      if (value == null) {
+        return null;
+      }
+
+      // TODO - We can probably hash the bytes directly given they're already UTF-8 input.
+      return apply(numBuckets, hash(value.toString()));

Review Comment:
   But this might make the testing of the `hash` function harder. Let me check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5513: Spark 3.3 - Support bucket in FunctionCatalog

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5513:
URL: https://github.com/apache/iceberg/pull/5513#discussion_r945206596


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/functions/BucketFunction.java:
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.functions;
+
+import java.nio.ByteBuffer;
+import java.util.Set;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.util.BucketUtil;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.functions.BoundFunction;
+import org.apache.spark.sql.connector.catalog.functions.ScalarFunction;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
+import org.apache.spark.sql.types.BinaryType;
+import org.apache.spark.sql.types.ByteType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.DateType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.sql.types.LongType;
+import org.apache.spark.sql.types.ShortType;
+import org.apache.spark.sql.types.StringType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.TimestampType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A Spark function implementation for the Iceberg bucket transform.
+ *
+ * <p>Example usage: {@code SELECT system.bucket(128, 'abc')}, which returns the bucket 122.
+ *
+ * <p>Note that for performance reasons, the given input number of buckets is not validated in the
+ * implementations used in code-gen. The number of buckets must be positive to give meaningful
+ * results.
+ */
+public class BucketFunction implements UnboundFunction {
+  private static final int NUM_BUCKETS_ORDINAL = 0;
+  private static final int VALUE_ORDINAL = 1;
+  private static final Set<DataType> SUPPORTED_NUM_BUCKETS_TYPES =
+      ImmutableSet.of(DataTypes.ByteType, DataTypes.ShortType, DataTypes.IntegerType);
+
+  @Override
+  public BoundFunction bind(StructType inputType) {
+    if (inputType.size() != 2) {
+      throw new UnsupportedOperationException(
+          "Wrong number of inputs (expected numBuckets and value)");
+    }
+
+    StructField numBucketsField = inputType.fields()[NUM_BUCKETS_ORDINAL];
+    StructField valueField = inputType.fields()[VALUE_ORDINAL];
+
+    if (!SUPPORTED_NUM_BUCKETS_TYPES.contains(numBucketsField.dataType())) {
+      throw new UnsupportedOperationException(
+          "Expected number of buckets to be tinyint, shortint or int");
+    }
+
+    // TODO - Support UUID type.
+    DataType type = valueField.dataType();
+    if (type instanceof DateType) {
+      return new BucketInt(DataTypes.DateType);
+    } else if (type instanceof ByteType
+        || type instanceof ShortType
+        || type instanceof IntegerType) {
+      return new BucketInt(DataTypes.IntegerType);
+    } else if (type instanceof LongType) {
+      return new BucketLong(DataTypes.LongType);
+    } else if (type instanceof TimestampType) {
+      return new BucketLong(DataTypes.TimestampType);
+    } else if (type instanceof DecimalType) {
+      return new BucketDecimal(((DecimalType) type).precision(), ((DecimalType) type).scale());
+    } else if (type instanceof StringType) {
+      return new BucketString();
+    } else if (type instanceof BinaryType) {
+      return new BucketBinary();
+    } else {
+      throw new UnsupportedOperationException(
+          "Expected bucketed column to be date, tinyint, smallint, int, bigint, decimal, timestamp, string, or binary");
+    }
+  }
+
+  @Override
+  public String description() {
+    return name()
+        + "(numBuckets, col) - Call Iceberg's bucket transform\n"
+        + "  numBuckets :: number of buckets to divide the rows into, e.g. bucket(100, 34) -> 79 (must be a tinyint, smallint, or int)\n"
+        + "  col :: column to bucket (must be a date, integer, long, timestamp, decimal, string, or binary)";
+  }
+
+  @Override
+  public String name() {
+    return "bucket";
+  }
+
+  public abstract static class BucketBase implements ScalarFunction<Integer> {
+    @Override
+    public String name() {
+      return "bucket";
+    }
+
+    @Override
+    public DataType resultType() {
+      return DataTypes.IntegerType;
+    }
+  }
+
+  // Used for both int and date - tinyint and smallint are upcasted to int by Spark.
+  public static class BucketInt extends BucketBase {
+    private final DataType sqlType;
+
+    // magic method used in codegen
+    public static int invoke(int numBuckets, int value) {
+      return (BucketUtil.hashInteger(value) & Integer.MAX_VALUE) % numBuckets;
+    }
+
+    public BucketInt(DataType sqlType) {
+      this.sqlType = sqlType;
+    }
+
+    @Override
+    public DataType[] inputTypes() {
+      return new DataType[] {DataTypes.IntegerType, sqlType};
+    }
+
+    @Override
+    public String canonicalName() {
+      return String.format("iceberg.bucket(%s)", sqlType.catalogString());
+    }
+
+    @Override
+    public Integer produceResult(InternalRow input) {
+      // return null for null input to match what Spark does in the code-generated versions.
+      return input.isNullAt(NUM_BUCKETS_ORDINAL) || input.isNullAt(VALUE_ORDINAL)
+          ? null
+          : invoke(input.getInt(NUM_BUCKETS_ORDINAL), input.getInt(VALUE_ORDINAL));
+    }
+  }
+
+  // Used for both BigInt and Timestamp
+  public static class BucketLong extends BucketBase {
+    private final DataType sqlType;
+
+    // magic function for usage with codegen - needs to be static
+    public static int invoke(int numBuckets, long value) {
+      return (BucketUtil.hashLong(value) & Integer.MAX_VALUE) % numBuckets;
+    }
+
+    public BucketLong(DataType sqlType) {
+      this.sqlType = sqlType;
+    }
+
+    @Override
+    public DataType[] inputTypes() {
+      return new DataType[] {DataTypes.IntegerType, sqlType};
+    }
+
+    @Override
+    public String canonicalName() {
+      return String.format("iceberg.bucket(%s)", sqlType.catalogString());
+    }
+
+    @Override
+    public Integer produceResult(InternalRow input) {
+      return input.isNullAt(NUM_BUCKETS_ORDINAL) || input.isNullAt(VALUE_ORDINAL)
+          ? null
+          : invoke(input.getInt(NUM_BUCKETS_ORDINAL), input.getLong(VALUE_ORDINAL));
+    }
+  }
+
+  // bucketing by Float is not allowed by the spec, but this has the float hash implementation
+  public static class BucketFloat extends BucketBase {
+
+    public static int invoke(int numBuckets, float value) {
+      return (BucketUtil.hashFloat(value) & Integer.MAX_VALUE) % numBuckets;
+    }
+
+    @Override
+    public DataType[] inputTypes() {
+      return new DataType[] {DataTypes.IntegerType, DataTypes.FloatType};
+    }
+
+    @Override
+    public String canonicalName() {
+      return "iceberg.bucket(float)";
+    }
+
+    @Override
+    public Integer produceResult(InternalRow input) {
+      return input.isNullAt(NUM_BUCKETS_ORDINAL) || input.isNullAt(VALUE_ORDINAL)
+          ? null
+          : invoke(input.getInt(NUM_BUCKETS_ORDINAL), input.getFloat(VALUE_ORDINAL));
+    }
+  }
+
+  public static class BucketString extends BucketBase {
+    // magic function for usage with codegen
+    public static Integer invoke(int numBuckets, UTF8String value) {
+      if (value == null) {
+        return null;
+      }
+
+      return (BucketUtil.hashCharSequence(value.toString()) & Integer.MAX_VALUE) % numBuckets;
+    }
+
+    @Override
+    public DataType[] inputTypes() {
+      return new DataType[] {DataTypes.IntegerType, DataTypes.StringType};
+    }
+
+    @Override
+    public String canonicalName() {
+      return "iceberg.bucket(string)";
+    }
+
+    @Override
+    public Integer produceResult(InternalRow input) {
+      return input.isNullAt(NUM_BUCKETS_ORDINAL) || input.isNullAt(VALUE_ORDINAL)
+          ? null
+          : invoke(input.getInt(NUM_BUCKETS_ORDINAL), input.getUTF8String(VALUE_ORDINAL));

Review Comment:
   The other PR avoided checking whether the value was null twice. Minor though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a diff in pull request #5513: Spark 3.3 - Support bucket in FunctionCatalog

Posted by GitBox <gi...@apache.org>.
kbendick commented on code in PR #5513:
URL: https://github.com/apache/iceberg/pull/5513#discussion_r945207187


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/functions/BucketFunction.java:
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.functions;
+
+import java.nio.ByteBuffer;
+import java.util.Set;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.util.BucketUtil;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.functions.BoundFunction;
+import org.apache.spark.sql.connector.catalog.functions.ScalarFunction;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
+import org.apache.spark.sql.types.BinaryType;
+import org.apache.spark.sql.types.ByteType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.DateType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.sql.types.LongType;
+import org.apache.spark.sql.types.ShortType;
+import org.apache.spark.sql.types.StringType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.TimestampType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A Spark function implementation for the Iceberg bucket transform.
+ *
+ * <p>Example usage: {@code SELECT system.bucket(128, 'abc')}, which returns the bucket 122.
+ *
+ * <p>Note that for performance reasons, the given input number of buckets is not validated in the
+ * implementations used in code-gen. The number of buckets must be positive to give meaningful
+ * results.
+ */
+public class BucketFunction implements UnboundFunction {
+  private static final int NUM_BUCKETS_ORDINAL = 0;
+  private static final int VALUE_ORDINAL = 1;
+  private static final Set<DataType> SUPPORTED_NUM_BUCKETS_TYPES =
+      ImmutableSet.of(DataTypes.ByteType, DataTypes.ShortType, DataTypes.IntegerType);
+
+  @Override
+  public BoundFunction bind(StructType inputType) {
+    if (inputType.size() != 2) {
+      throw new UnsupportedOperationException(
+          "Wrong number of inputs (expected numBuckets and value)");
+    }
+
+    StructField numBucketsField = inputType.fields()[NUM_BUCKETS_ORDINAL];
+    StructField valueField = inputType.fields()[VALUE_ORDINAL];
+
+    if (!SUPPORTED_NUM_BUCKETS_TYPES.contains(numBucketsField.dataType())) {
+      throw new UnsupportedOperationException(
+          "Expected number of buckets to be tinyint, shortint or int");
+    }
+
+    // TODO - Support UUID type.

Review Comment:
   Cool. Thats why I skipped it (and some other known issues with UUID type in Spark).
   
   Will remove.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a diff in pull request #5513: Spark 3.3 - Support bucket in FunctionCatalog

Posted by GitBox <gi...@apache.org>.
kbendick commented on code in PR #5513:
URL: https://github.com/apache/iceberg/pull/5513#discussion_r946012812


##########
api/src/main/java/org/apache/iceberg/util/BucketUtil.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.util;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.UUID;
+import org.apache.iceberg.relocated.com.google.common.hash.HashFunction;
+import org.apache.iceberg.relocated.com.google.common.hash.Hashing;
+
+/**
+ * Contains the logic for hashing various types for use with the {@code bucket} partition
+ * transformations
+ */
+public class BucketUtil {
+
+  private static final HashFunction MURMUR3 = Hashing.murmur3_32_fixed();
+
+  private BucketUtil() {}
+
+  public static int hashInteger(Integer value) {

Review Comment:
   Updated all to `hash` and changed to use unboxed types.
   
   I might be able to remove some redundant definitions. I'll check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a diff in pull request #5513: Spark 3.3 - Support bucket in FunctionCatalog

Posted by GitBox <gi...@apache.org>.
kbendick commented on code in PR #5513:
URL: https://github.com/apache/iceberg/pull/5513#discussion_r945207438


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/functions/BucketFunction.java:
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.functions;
+
+import java.nio.ByteBuffer;
+import java.util.Set;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.util.BucketUtil;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.functions.BoundFunction;
+import org.apache.spark.sql.connector.catalog.functions.ScalarFunction;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
+import org.apache.spark.sql.types.BinaryType;
+import org.apache.spark.sql.types.ByteType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.DateType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.sql.types.LongType;
+import org.apache.spark.sql.types.ShortType;
+import org.apache.spark.sql.types.StringType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.TimestampType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A Spark function implementation for the Iceberg bucket transform.
+ *
+ * <p>Example usage: {@code SELECT system.bucket(128, 'abc')}, which returns the bucket 122.
+ *
+ * <p>Note that for performance reasons, the given input number of buckets is not validated in the
+ * implementations used in code-gen. The number of buckets must be positive to give meaningful
+ * results.
+ */
+public class BucketFunction implements UnboundFunction {
+  private static final int NUM_BUCKETS_ORDINAL = 0;
+  private static final int VALUE_ORDINAL = 1;
+  private static final Set<DataType> SUPPORTED_NUM_BUCKETS_TYPES =
+      ImmutableSet.of(DataTypes.ByteType, DataTypes.ShortType, DataTypes.IntegerType);
+
+  @Override
+  public BoundFunction bind(StructType inputType) {
+    if (inputType.size() != 2) {
+      throw new UnsupportedOperationException(
+          "Wrong number of inputs (expected numBuckets and value)");
+    }
+
+    StructField numBucketsField = inputType.fields()[NUM_BUCKETS_ORDINAL];
+    StructField valueField = inputType.fields()[VALUE_ORDINAL];
+
+    if (!SUPPORTED_NUM_BUCKETS_TYPES.contains(numBucketsField.dataType())) {
+      throw new UnsupportedOperationException(
+          "Expected number of buckets to be tinyint, shortint or int");
+    }
+
+    // TODO - Support UUID type.
+    DataType type = valueField.dataType();
+    if (type instanceof DateType) {
+      return new BucketInt(DataTypes.DateType);
+    } else if (type instanceof ByteType
+        || type instanceof ShortType
+        || type instanceof IntegerType) {
+      return new BucketInt(DataTypes.IntegerType);
+    } else if (type instanceof LongType) {
+      return new BucketLong(DataTypes.LongType);
+    } else if (type instanceof TimestampType) {
+      return new BucketLong(DataTypes.TimestampType);
+    } else if (type instanceof DecimalType) {
+      return new BucketDecimal(((DecimalType) type).precision(), ((DecimalType) type).scale());
+    } else if (type instanceof StringType) {
+      return new BucketString();
+    } else if (type instanceof BinaryType) {
+      return new BucketBinary();
+    } else {
+      throw new UnsupportedOperationException(
+          "Expected bucketed column to be date, tinyint, smallint, int, bigint, decimal, timestamp, string, or binary");
+    }
+  }
+
+  @Override
+  public String description() {
+    return name()
+        + "(numBuckets, col) - Call Iceberg's bucket transform\n"
+        + "  numBuckets :: number of buckets to divide the rows into, e.g. bucket(100, 34) -> 79 (must be a tinyint, smallint, or int)\n"
+        + "  col :: column to bucket (must be a date, integer, long, timestamp, decimal, string, or binary)";
+  }
+
+  @Override
+  public String name() {
+    return "bucket";
+  }
+
+  public abstract static class BucketBase implements ScalarFunction<Integer> {
+    @Override
+    public String name() {
+      return "bucket";
+    }
+
+    @Override
+    public DataType resultType() {
+      return DataTypes.IntegerType;
+    }
+  }
+
+  // Used for both int and date - tinyint and smallint are upcasted to int by Spark.
+  public static class BucketInt extends BucketBase {
+    private final DataType sqlType;
+
+    // magic method used in codegen
+    public static int invoke(int numBuckets, int value) {
+      return (BucketUtil.hashInteger(value) & Integer.MAX_VALUE) % numBuckets;

Review Comment:
   Yeah I believe so. I originally had the subclasses using a tempted type in an earlier design, but too many things needed "static" copies due to the inability to use static with templated types.
   
   I encountered some issues in a previous design with Spark not sending the generic `produceResult` definition to executors when it was in the base class, but I think with a static function that shouldnt happen.
   
   Will try to make it work tonight now that there's no template type.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a diff in pull request #5513: Spark 3.3 - Support bucket in FunctionCatalog

Posted by GitBox <gi...@apache.org>.
kbendick commented on code in PR #5513:
URL: https://github.com/apache/iceberg/pull/5513#discussion_r947192874


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/functions/BucketFunction.java:
##########
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.functions;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.Set;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.util.BucketUtil;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.functions.BoundFunction;
+import org.apache.spark.sql.connector.catalog.functions.ScalarFunction;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
+import org.apache.spark.sql.types.BinaryType;
+import org.apache.spark.sql.types.ByteType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.DateType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.sql.types.LongType;
+import org.apache.spark.sql.types.ShortType;
+import org.apache.spark.sql.types.StringType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.TimestampType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A Spark function implementation for the Iceberg bucket transform.
+ *
+ * <p>Example usage: {@code SELECT system.bucket(128, 'abc')}, which returns the bucket 122.
+ *
+ * <p>Note that for performance reasons, the given input number of buckets is not validated in the
+ * implementations used in code-gen. The number of buckets must be positive to give meaningful
+ * results.
+ */
+public class BucketFunction implements UnboundFunction {
+  private static final int NUM_BUCKETS_ORDINAL = 0;
+  private static final int VALUE_ORDINAL = 1;
+  private static final Set<DataType> SUPPORTED_NUM_BUCKETS_TYPES =
+      ImmutableSet.of(DataTypes.ByteType, DataTypes.ShortType, DataTypes.IntegerType);
+
+  @Override
+  public BoundFunction bind(StructType inputType) {
+    if (inputType.size() != 2) {
+      throw new UnsupportedOperationException(
+          "Wrong number of inputs (expected numBuckets and value)");
+    }
+
+    StructField numBucketsField = inputType.fields()[NUM_BUCKETS_ORDINAL];
+    StructField valueField = inputType.fields()[VALUE_ORDINAL];
+
+    if (!SUPPORTED_NUM_BUCKETS_TYPES.contains(numBucketsField.dataType())) {
+      throw new UnsupportedOperationException(
+          "Expected number of buckets to be tinyint, shortint or int");
+    }
+
+    DataType type = valueField.dataType();
+    if (type instanceof DateType) {
+      return new BucketInt(type);
+    } else if (type instanceof ByteType
+        || type instanceof ShortType
+        || type instanceof IntegerType) {
+      return new BucketInt(DataTypes.IntegerType);
+    } else if (type instanceof LongType) {
+      return new BucketLong(type);
+    } else if (type instanceof TimestampType) {
+      return new BucketLong(type);
+    } else if (type instanceof DecimalType) {
+      return new BucketDecimal(type);
+    } else if (type instanceof StringType) {
+      return new BucketString();
+    } else if (type instanceof BinaryType) {
+      return new BucketBinary();
+    } else {
+      throw new UnsupportedOperationException(
+          "Expected column to be date, tinyint, smallint, int, bigint, decimal, timestamp, string, or binary");
+    }
+  }
+
+  @Override
+  public String description() {
+    return name()
+        + "(numBuckets, col) - Call Iceberg's bucket transform\n"
+        + "  numBuckets :: number of buckets to divide the rows into, e.g. bucket(100, 34) -> 79 (must be a tinyint, smallint, or int)\n"
+        + "  col :: column to bucket (must be a date, integer, long, timestamp, decimal, string, or binary)";
+  }
+
+  @Override
+  public String name() {
+    return "bucket";
+  }
+
+  public abstract static class BucketBase implements ScalarFunction<Integer> {
+    public static int apply(int numBuckets, int hashedValue) {
+      return (hashedValue & Integer.MAX_VALUE) % numBuckets;
+    }
+
+    @Override
+    public String name() {
+      return "bucket";
+    }
+
+    @Override
+    public DataType resultType() {
+      return DataTypes.IntegerType;
+    }
+  }
+
+  // Used for both int and date - tinyint and smallint are upcasted to int by Spark.
+  public static class BucketInt extends BucketBase {
+    private final DataType sqlType;
+
+    // magic method used in codegen
+    public static int invoke(int numBuckets, int value) {
+      return apply(numBuckets, hash(value));
+    }
+
+    // Visible for testing
+    public static int hash(int value) {
+      return BucketUtil.hash(value);
+    }
+
+    public BucketInt(DataType sqlType) {
+      this.sqlType = sqlType;
+    }
+
+    @Override
+    public DataType[] inputTypes() {
+      return new DataType[] {DataTypes.IntegerType, sqlType};
+    }
+
+    @Override
+    public String canonicalName() {
+      return String.format("iceberg.bucket(%s)", sqlType.catalogString());
+    }
+
+    @Override
+    public Integer produceResult(InternalRow input) {
+      // return null for null input to match what Spark does in the code-generated versions.
+      return input.isNullAt(NUM_BUCKETS_ORDINAL) || input.isNullAt(VALUE_ORDINAL)
+          ? null
+          : invoke(input.getInt(NUM_BUCKETS_ORDINAL), input.getInt(VALUE_ORDINAL));
+    }
+  }
+
+  // Used for both BigInt and Timestamp
+  public static class BucketLong extends BucketBase {
+    private final DataType sqlType;
+
+    // magic function for usage with codegen - needs to be static
+    public static int invoke(int numBuckets, long value) {
+      return apply(numBuckets, hash(value));
+    }
+
+    // Visible for testing
+    public static int hash(long value) {
+      return BucketUtil.hash(value);
+    }
+
+    public BucketLong(DataType sqlType) {
+      this.sqlType = sqlType;
+    }
+
+    @Override
+    public DataType[] inputTypes() {
+      return new DataType[] {DataTypes.IntegerType, sqlType};
+    }
+
+    @Override
+    public String canonicalName() {
+      return String.format("iceberg.bucket(%s)", sqlType.catalogString());
+    }
+
+    @Override
+    public Integer produceResult(InternalRow input) {
+      return input.isNullAt(NUM_BUCKETS_ORDINAL) || input.isNullAt(VALUE_ORDINAL)
+          ? null
+          : invoke(input.getInt(NUM_BUCKETS_ORDINAL), input.getLong(VALUE_ORDINAL));
+    }
+  }
+
+  public static class BucketString extends BucketBase {
+    // magic function for usage with codegen
+    public static Integer invoke(int numBuckets, UTF8String value) {
+      if (value == null) {
+        return null;
+      }
+
+      // TODO - We can probably hash the bytes directly given they're already UTF-8 input.
+      return apply(numBuckets, hash(value.toString()));

Review Comment:
   Yeah if we change `hash` here to be `public static int hash(ByteBuffer value)` we could then `return apply(numBuckets, hash(value.getByteBuffer()));` which returns the same results.
   
   I think this change is worth it, thought it makes the tests _slightly_ wonkier for the test suite that checks the `hash` function output directly as we have to call `ByteBuffer.wrap("iceberg".getBytes("UTF-8"))`.
   
   But it would arguably make the bucket string function faster on UTF8String input which is what will get passed in at runtime.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a diff in pull request #5513: Spark 3.3 - Support bucket in FunctionCatalog

Posted by GitBox <gi...@apache.org>.
kbendick commented on code in PR #5513:
URL: https://github.com/apache/iceberg/pull/5513#discussion_r947184432


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/functions/BucketFunction.java:
##########
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.functions;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.Set;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.util.BucketUtil;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.functions.BoundFunction;
+import org.apache.spark.sql.connector.catalog.functions.ScalarFunction;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
+import org.apache.spark.sql.types.BinaryType;
+import org.apache.spark.sql.types.ByteType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.DateType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.sql.types.LongType;
+import org.apache.spark.sql.types.ShortType;
+import org.apache.spark.sql.types.StringType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.TimestampType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A Spark function implementation for the Iceberg bucket transform.
+ *
+ * <p>Example usage: {@code SELECT system.bucket(128, 'abc')}, which returns the bucket 122.
+ *
+ * <p>Note that for performance reasons, the given input number of buckets is not validated in the
+ * implementations used in code-gen. The number of buckets must be positive to give meaningful
+ * results.
+ */
+public class BucketFunction implements UnboundFunction {
+  private static final int NUM_BUCKETS_ORDINAL = 0;
+  private static final int VALUE_ORDINAL = 1;
+  private static final Set<DataType> SUPPORTED_NUM_BUCKETS_TYPES =
+      ImmutableSet.of(DataTypes.ByteType, DataTypes.ShortType, DataTypes.IntegerType);
+
+  @Override
+  public BoundFunction bind(StructType inputType) {
+    if (inputType.size() != 2) {
+      throw new UnsupportedOperationException(
+          "Wrong number of inputs (expected numBuckets and value)");
+    }
+
+    StructField numBucketsField = inputType.fields()[NUM_BUCKETS_ORDINAL];
+    StructField valueField = inputType.fields()[VALUE_ORDINAL];
+
+    if (!SUPPORTED_NUM_BUCKETS_TYPES.contains(numBucketsField.dataType())) {
+      throw new UnsupportedOperationException(
+          "Expected number of buckets to be tinyint, shortint or int");
+    }
+
+    DataType type = valueField.dataType();
+    if (type instanceof DateType) {
+      return new BucketInt(type);
+    } else if (type instanceof ByteType
+        || type instanceof ShortType
+        || type instanceof IntegerType) {
+      return new BucketInt(DataTypes.IntegerType);
+    } else if (type instanceof LongType) {
+      return new BucketLong(type);
+    } else if (type instanceof TimestampType) {
+      return new BucketLong(type);
+    } else if (type instanceof DecimalType) {
+      return new BucketDecimal(type);
+    } else if (type instanceof StringType) {
+      return new BucketString();
+    } else if (type instanceof BinaryType) {
+      return new BucketBinary();
+    } else {
+      throw new UnsupportedOperationException(
+          "Expected column to be date, tinyint, smallint, int, bigint, decimal, timestamp, string, or binary");
+    }
+  }
+
+  @Override
+  public String description() {
+    return name()
+        + "(numBuckets, col) - Call Iceberg's bucket transform\n"
+        + "  numBuckets :: number of buckets to divide the rows into, e.g. bucket(100, 34) -> 79 (must be a tinyint, smallint, or int)\n"
+        + "  col :: column to bucket (must be a date, integer, long, timestamp, decimal, string, or binary)";
+  }
+
+  @Override
+  public String name() {
+    return "bucket";
+  }
+
+  public abstract static class BucketBase implements ScalarFunction<Integer> {
+    public static int apply(int numBuckets, int hashedValue) {
+      return (hashedValue & Integer.MAX_VALUE) % numBuckets;
+    }
+
+    @Override
+    public String name() {
+      return "bucket";
+    }
+
+    @Override
+    public DataType resultType() {
+      return DataTypes.IntegerType;
+    }
+  }
+
+  // Used for both int and date - tinyint and smallint are upcasted to int by Spark.
+  public static class BucketInt extends BucketBase {
+    private final DataType sqlType;
+
+    // magic method used in codegen
+    public static int invoke(int numBuckets, int value) {
+      return apply(numBuckets, hash(value));
+    }
+
+    // Visible for testing
+    public static int hash(int value) {
+      return BucketUtil.hash(value);
+    }
+
+    public BucketInt(DataType sqlType) {
+      this.sqlType = sqlType;
+    }
+
+    @Override
+    public DataType[] inputTypes() {
+      return new DataType[] {DataTypes.IntegerType, sqlType};
+    }
+
+    @Override
+    public String canonicalName() {
+      return String.format("iceberg.bucket(%s)", sqlType.catalogString());
+    }
+
+    @Override
+    public Integer produceResult(InternalRow input) {
+      // return null for null input to match what Spark does in the code-generated versions.
+      return input.isNullAt(NUM_BUCKETS_ORDINAL) || input.isNullAt(VALUE_ORDINAL)
+          ? null
+          : invoke(input.getInt(NUM_BUCKETS_ORDINAL), input.getInt(VALUE_ORDINAL));
+    }
+  }
+
+  // Used for both BigInt and Timestamp
+  public static class BucketLong extends BucketBase {
+    private final DataType sqlType;
+
+    // magic function for usage with codegen - needs to be static
+    public static int invoke(int numBuckets, long value) {
+      return apply(numBuckets, hash(value));
+    }
+
+    // Visible for testing
+    public static int hash(long value) {
+      return BucketUtil.hash(value);
+    }
+
+    public BucketLong(DataType sqlType) {
+      this.sqlType = sqlType;
+    }
+
+    @Override
+    public DataType[] inputTypes() {
+      return new DataType[] {DataTypes.IntegerType, sqlType};
+    }
+
+    @Override
+    public String canonicalName() {
+      return String.format("iceberg.bucket(%s)", sqlType.catalogString());
+    }
+
+    @Override
+    public Integer produceResult(InternalRow input) {
+      return input.isNullAt(NUM_BUCKETS_ORDINAL) || input.isNullAt(VALUE_ORDINAL)
+          ? null
+          : invoke(input.getInt(NUM_BUCKETS_ORDINAL), input.getLong(VALUE_ORDINAL));
+    }
+  }
+
+  public static class BucketString extends BucketBase {
+    // magic function for usage with codegen
+    public static Integer invoke(int numBuckets, UTF8String value) {
+      if (value == null) {
+        return null;
+      }
+
+      // TODO - We can probably hash the bytes directly given they're already UTF-8 input.
+      return apply(numBuckets, hash(value.toString()));

Review Comment:
   I think since this is already a UTF-8 String we can hash the bytes directly instead of converting to Java String which is UTF-16 (for CharSequence) -> converting to UTF-8 bytes -> hashing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on pull request #5513: Spark 3.3 - Support bucket in FunctionCatalog

Posted by GitBox <gi...@apache.org>.
kbendick commented on PR #5513:
URL: https://github.com/apache/iceberg/pull/5513#issuecomment-1217080414

   @rdblue is there anything else you want me to update before this is ready to merge or are we just giving other time for people to review?
   
   Also cc @huaxingao @aokolnychyi the Spark `bucket` function that can be used for storage partitioned joins on `bucket` columns. PTAL. 🙂 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5513: Spark 3.3 - Support bucket in FunctionCatalog

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5513:
URL: https://github.com/apache/iceberg/pull/5513#discussion_r945319871


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/functions/BucketFunction.java:
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.functions;
+
+import java.nio.ByteBuffer;
+import java.util.Set;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.util.BucketUtil;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.functions.BoundFunction;
+import org.apache.spark.sql.connector.catalog.functions.ScalarFunction;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
+import org.apache.spark.sql.types.BinaryType;
+import org.apache.spark.sql.types.ByteType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.DateType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.sql.types.LongType;
+import org.apache.spark.sql.types.ShortType;
+import org.apache.spark.sql.types.StringType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.TimestampType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A Spark function implementation for the Iceberg bucket transform.
+ *
+ * <p>Example usage: {@code SELECT system.bucket(128, 'abc')}, which returns the bucket 122.
+ *
+ * <p>Note that for performance reasons, the given input number of buckets is not validated in the
+ * implementations used in code-gen. The number of buckets must be positive to give meaningful
+ * results.
+ */
+public class BucketFunction implements UnboundFunction {
+  private static final int NUM_BUCKETS_ORDINAL = 0;
+  private static final int VALUE_ORDINAL = 1;
+  private static final Set<DataType> SUPPORTED_NUM_BUCKETS_TYPES =
+      ImmutableSet.of(DataTypes.ByteType, DataTypes.ShortType, DataTypes.IntegerType);
+
+  @Override
+  public BoundFunction bind(StructType inputType) {
+    if (inputType.size() != 2) {
+      throw new UnsupportedOperationException(
+          "Wrong number of inputs (expected numBuckets and value)");
+    }
+
+    StructField numBucketsField = inputType.fields()[NUM_BUCKETS_ORDINAL];
+    StructField valueField = inputType.fields()[VALUE_ORDINAL];
+
+    if (!SUPPORTED_NUM_BUCKETS_TYPES.contains(numBucketsField.dataType())) {
+      throw new UnsupportedOperationException(
+          "Expected number of buckets to be tinyint, shortint or int");
+    }
+
+    // TODO - Support UUID type.
+    DataType type = valueField.dataType();
+    if (type instanceof DateType) {
+      return new BucketInt(DataTypes.DateType);
+    } else if (type instanceof ByteType
+        || type instanceof ShortType
+        || type instanceof IntegerType) {
+      return new BucketInt(DataTypes.IntegerType);
+    } else if (type instanceof LongType) {
+      return new BucketLong(DataTypes.LongType);
+    } else if (type instanceof TimestampType) {
+      return new BucketLong(DataTypes.TimestampType);
+    } else if (type instanceof DecimalType) {
+      return new BucketDecimal(((DecimalType) type).precision(), ((DecimalType) type).scale());
+    } else if (type instanceof StringType) {
+      return new BucketString();
+    } else if (type instanceof BinaryType) {
+      return new BucketBinary();
+    } else {
+      throw new UnsupportedOperationException(
+          "Expected bucketed column to be date, tinyint, smallint, int, bigint, decimal, timestamp, string, or binary");
+    }
+  }
+
+  @Override
+  public String description() {
+    return name()
+        + "(numBuckets, col) - Call Iceberg's bucket transform\n"
+        + "  numBuckets :: number of buckets to divide the rows into, e.g. bucket(100, 34) -> 79 (must be a tinyint, smallint, or int)\n"
+        + "  col :: column to bucket (must be a date, integer, long, timestamp, decimal, string, or binary)";
+  }
+
+  @Override
+  public String name() {
+    return "bucket";
+  }
+
+  public abstract static class BucketBase implements ScalarFunction<Integer> {
+    @Override
+    public String name() {
+      return "bucket";
+    }
+
+    @Override
+    public DataType resultType() {
+      return DataTypes.IntegerType;
+    }
+  }
+
+  // Used for both int and date - tinyint and smallint are upcasted to int by Spark.
+  public static class BucketInt extends BucketBase {
+    private final DataType sqlType;
+
+    // magic method used in codegen
+    public static int invoke(int numBuckets, int value) {
+      return (BucketUtil.hashInteger(value) & Integer.MAX_VALUE) % numBuckets;

Review Comment:
   You could also put this in the base class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a diff in pull request #5513: Spark 3.3 - Support bucket in FunctionCatalog

Posted by GitBox <gi...@apache.org>.
kbendick commented on code in PR #5513:
URL: https://github.com/apache/iceberg/pull/5513#discussion_r944896127


##########
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkBucketFunction.java:
##########
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.sql;
+
+import java.nio.charset.StandardCharsets;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.relocated.com.google.common.io.BaseEncoding;
+import org.apache.iceberg.spark.SparkTestBaseWithCatalog;
+import org.apache.spark.sql.AnalysisException;
+import org.assertj.core.api.Assertions;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestSparkBucketFunction extends SparkTestBaseWithCatalog {
+  public TestSparkBucketFunction() {}
+
+  @Before
+  public void useCatalog() {
+    sql("USE %s", catalogName);
+  }
+
+  @Test
+  public void testBucketIntegers() {
+    Assert.assertEquals(
+        "Byte type should bucket similarly to integer",
+        3,
+        scalarSql("SELECT system.bucket(10, 8Y)"));
+    Assert.assertEquals(
+        "Short type should bucket similarly to integer",
+        3,
+        scalarSql("SELECT system.bucket(10, 8S)"));
+    // Integers
+    Assert.assertEquals(3, scalarSql("SELECT system.bucket(10, 8)"));
+    Assert.assertEquals(79, scalarSql("SELECT system.bucket(100, 34)"));
+    Assert.assertNull(scalarSql("SELECT system.bucket(1, CAST(null AS INT))"));
+  }
+
+  @Test
+  public void testBucketDates() {
+    Assert.assertEquals(3, scalarSql("SELECT system.bucket(10, date('1970-01-09'))"));
+    Assert.assertEquals(79, scalarSql("SELECT system.bucket(100, date('1970-02-04'))"));
+    Assert.assertNull(scalarSql("SELECT system.bucket(1, CAST(null AS DATE))"));
+  }
+
+  @Test
+  public void testBucketLong() {
+    Assert.assertEquals(79, scalarSql("SELECT system.bucket(100, 34L)"));
+    Assert.assertEquals(76, scalarSql("SELECT system.bucket(100, 0L)"));
+    Assert.assertEquals(97, scalarSql("SELECT system.bucket(100, -34L)"));
+    Assert.assertEquals(0, scalarSql("SELECT system.bucket(2, -1L)"));
+    Assert.assertNull(scalarSql("SELECT system.bucket(2, CAST(null AS LONG))"));
+  }
+
+  @Test
+  public void testBucketDecimal() {
+    Assert.assertEquals(56, scalarSql("SELECT system.bucket(64, CAST('12.34' as DECIMAL(9, 2)))"));
+    Assert.assertEquals(13, scalarSql("SELECT system.bucket(18, CAST('12.30' as DECIMAL(9, 2)))"));
+    Assert.assertEquals(2, scalarSql("SELECT system.bucket(16, CAST('12.999' as DECIMAL(9, 3)))"));
+    Assert.assertEquals(21, scalarSql("SELECT system.bucket(32, CAST('0.05' as DECIMAL(5, 2)))"));
+    Assert.assertEquals(85, scalarSql("SELECT system.bucket(128, CAST('0.05' as DECIMAL(9, 2)))"));
+    Assert.assertEquals(3, scalarSql("SELECT system.bucket(18, CAST('0.05' as DECIMAL(9, 2)))"));
+
+    Assert.assertNull(
+        "Null input should return null",
+        scalarSql("SELECT system.bucket(2, CAST(null AS decimal))"));
+  }
+
+  @Test
+  public void testBucketTimestamp() {
+    Assert.assertEquals(
+        99, scalarSql("SELECT system.bucket(100, TIMESTAMP '1997-01-01 00:00:00 UTC+00:00')"));

Review Comment:
   Updated this timestamp literal to include timezone so that the test isn't flakey. We might want to set the default timezone in the base test class, but that could upset other tests. I'll investigate.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5513: Spark 3.3 - Support bucket in FunctionCatalog

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5513:
URL: https://github.com/apache/iceberg/pull/5513#discussion_r945194840


##########
api/src/main/java/org/apache/iceberg/util/BucketUtil.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.util;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.UUID;
+import org.apache.iceberg.relocated.com.google.common.hash.HashFunction;
+import org.apache.iceberg.relocated.com.google.common.hash.Hashing;
+
+/**
+ * Contains the logic for hashing various types for use with the {@code bucket} partition
+ * transformations
+ */
+public class BucketUtil {
+
+  private static final HashFunction MURMUR3 = Hashing.murmur3_32_fixed();
+
+  private BucketUtil() {}
+
+  public static int hashInteger(Integer value) {

Review Comment:
   Why do these names include the type? You should be able to just overload `hash`.
   
   Also, why pass values as boxed types here? `HashFunction` accepts primitives, so using a boxed type here means that you'll have to box values passed to `invoke` just to unbox them here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5513: Spark 3.3 - Support bucket in FunctionCatalog

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5513:
URL: https://github.com/apache/iceberg/pull/5513#discussion_r945206323


##########
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkBucketFunction.java:
##########
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.sql;
+
+import java.nio.charset.StandardCharsets;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.relocated.com.google.common.io.BaseEncoding;
+import org.apache.iceberg.spark.SparkTestBaseWithCatalog;
+import org.apache.spark.sql.AnalysisException;
+import org.assertj.core.api.Assertions;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestSparkBucketFunction extends SparkTestBaseWithCatalog {
+  public TestSparkBucketFunction() {}
+
+  @Before
+  public void useCatalog() {
+    sql("USE %s", catalogName);
+  }
+
+  @Test
+  public void testBucketIntegers() {
+    Assert.assertEquals(
+        "Byte type should bucket similarly to integer",
+        3,
+        scalarSql("SELECT system.bucket(10, 8Y)"));
+    Assert.assertEquals(
+        "Short type should bucket similarly to integer",
+        3,
+        scalarSql("SELECT system.bucket(10, 8S)"));
+    // Integers
+    Assert.assertEquals(3, scalarSql("SELECT system.bucket(10, 8)"));
+    Assert.assertEquals(79, scalarSql("SELECT system.bucket(100, 34)"));
+    Assert.assertNull(scalarSql("SELECT system.bucket(1, CAST(null AS INT))"));
+  }
+
+  @Test
+  public void testBucketDates() {
+    Assert.assertEquals(3, scalarSql("SELECT system.bucket(10, date('1970-01-09'))"));
+    Assert.assertEquals(79, scalarSql("SELECT system.bucket(100, date('1970-02-04'))"));
+    Assert.assertNull(scalarSql("SELECT system.bucket(1, CAST(null AS DATE))"));
+  }
+
+  @Test
+  public void testBucketLong() {
+    Assert.assertEquals(79, scalarSql("SELECT system.bucket(100, 34L)"));
+    Assert.assertEquals(76, scalarSql("SELECT system.bucket(100, 0L)"));
+    Assert.assertEquals(97, scalarSql("SELECT system.bucket(100, -34L)"));
+    Assert.assertEquals(0, scalarSql("SELECT system.bucket(2, -1L)"));
+    Assert.assertNull(scalarSql("SELECT system.bucket(2, CAST(null AS LONG))"));
+  }
+
+  @Test
+  public void testBucketDecimal() {
+    Assert.assertEquals(56, scalarSql("SELECT system.bucket(64, CAST('12.34' as DECIMAL(9, 2)))"));
+    Assert.assertEquals(13, scalarSql("SELECT system.bucket(18, CAST('12.30' as DECIMAL(9, 2)))"));
+    Assert.assertEquals(2, scalarSql("SELECT system.bucket(16, CAST('12.999' as DECIMAL(9, 3)))"));
+    Assert.assertEquals(21, scalarSql("SELECT system.bucket(32, CAST('0.05' as DECIMAL(5, 2)))"));
+    Assert.assertEquals(85, scalarSql("SELECT system.bucket(128, CAST('0.05' as DECIMAL(9, 2)))"));
+    Assert.assertEquals(3, scalarSql("SELECT system.bucket(18, CAST('0.05' as DECIMAL(9, 2)))"));
+
+    Assert.assertNull(
+        "Null input should return null",
+        scalarSql("SELECT system.bucket(2, CAST(null AS decimal))"));
+  }
+
+  @Test
+  public void testBucketTimestamp() {
+    Assert.assertEquals(
+        99, scalarSql("SELECT system.bucket(100, TIMESTAMP '1997-01-01 00:00:00 UTC+00:00')"));
+    Assert.assertEquals(
+        85, scalarSql("SELECT system.bucket(100, TIMESTAMP '1997-01-31 09:26:56 UTC+00:00')"));
+    Assert.assertEquals(
+        62, scalarSql("SELECT system.bucket(100, TIMESTAMP '2022-08-08 00:00:00 UTC+00:00')"));
+    Assert.assertNull(scalarSql("SELECT system.bucket(2, CAST(null AS timestamp))"));
+  }
+
+  @Test
+  public void testBucketString() {
+    Assert.assertEquals(4, scalarSql("SELECT system.bucket(5, 'abcdefg')"));
+    Assert.assertEquals(122, scalarSql("SELECT system.bucket(128, 'abc')"));
+    Assert.assertEquals(54, scalarSql("SELECT system.bucket(64, 'abcde')"));
+    Assert.assertEquals(8, scalarSql("SELECT system.bucket(12, '测试')"));
+    Assert.assertEquals(1, scalarSql("SELECT system.bucket(16, '测试raul试测')"));
+    Assert.assertEquals(
+        "Varchar should work like string",
+        1,
+        scalarSql("SELECT system.bucket(16, CAST('测试raul试测' AS varchar(8)))"));
+    Assert.assertEquals(
+        "Char should work like string",
+        1,
+        scalarSql("SELECT system.bucket(16, CAST('测试raul试测' AS char(8)))"));
+    Assert.assertEquals(
+        "Should not fail on the empty string", 0, scalarSql("SELECT system.bucket(16, '')"));
+    Assert.assertNull(
+        "Null input should return null as output",
+        scalarSql("SELECT system.bucket(16, CAST(null AS string))"));
+  }
+
+  @Test
+  public void testBucketBinary() {
+    Assert.assertEquals(
+        1, scalarSql("SELECT system.bucket(10, X'0102030405060708090a0b0c0d0e0f')"));
+    Assert.assertEquals(10, scalarSql("SELECT system.bucket(12, %s)", asBytesLiteral("abcdefg")));
+    Assert.assertEquals(13, scalarSql("SELECT system.bucket(18, %s)", asBytesLiteral("abc\0\0")));
+    Assert.assertEquals(42, scalarSql("SELECT system.bucket(48, %s)", asBytesLiteral("abc")));
+    Assert.assertEquals(3, scalarSql("SELECT system.bucket(16, %s)", asBytesLiteral("测试_")));

Review Comment:
   Are these all of the tests from the bucket function tests?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a diff in pull request #5513: Spark 3.3 - Support bucket in FunctionCatalog

Posted by GitBox <gi...@apache.org>.
kbendick commented on code in PR #5513:
URL: https://github.com/apache/iceberg/pull/5513#discussion_r946109646


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/functions/BucketFunction.java:
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.functions;
+
+import java.nio.ByteBuffer;
+import java.util.Set;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.util.BucketUtil;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.functions.BoundFunction;
+import org.apache.spark.sql.connector.catalog.functions.ScalarFunction;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
+import org.apache.spark.sql.types.BinaryType;
+import org.apache.spark.sql.types.ByteType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.DateType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.sql.types.LongType;
+import org.apache.spark.sql.types.ShortType;
+import org.apache.spark.sql.types.StringType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.TimestampType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A Spark function implementation for the Iceberg bucket transform.
+ *
+ * <p>Example usage: {@code SELECT system.bucket(128, 'abc')}, which returns the bucket 122.
+ *
+ * <p>Note that for performance reasons, the given input number of buckets is not validated in the
+ * implementations used in code-gen. The number of buckets must be positive to give meaningful
+ * results.
+ */
+public class BucketFunction implements UnboundFunction {
+  private static final int NUM_BUCKETS_ORDINAL = 0;
+  private static final int VALUE_ORDINAL = 1;
+  private static final Set<DataType> SUPPORTED_NUM_BUCKETS_TYPES =
+      ImmutableSet.of(DataTypes.ByteType, DataTypes.ShortType, DataTypes.IntegerType);
+
+  @Override
+  public BoundFunction bind(StructType inputType) {
+    if (inputType.size() != 2) {
+      throw new UnsupportedOperationException(
+          "Wrong number of inputs (expected numBuckets and value)");
+    }
+
+    StructField numBucketsField = inputType.fields()[NUM_BUCKETS_ORDINAL];
+    StructField valueField = inputType.fields()[VALUE_ORDINAL];
+
+    if (!SUPPORTED_NUM_BUCKETS_TYPES.contains(numBucketsField.dataType())) {
+      throw new UnsupportedOperationException(
+          "Expected number of buckets to be tinyint, shortint or int");
+    }
+
+    // TODO - Support UUID type.
+    DataType type = valueField.dataType();
+    if (type instanceof DateType) {
+      return new BucketInt(DataTypes.DateType);
+    } else if (type instanceof ByteType
+        || type instanceof ShortType
+        || type instanceof IntegerType) {
+      return new BucketInt(DataTypes.IntegerType);
+    } else if (type instanceof LongType) {
+      return new BucketLong(DataTypes.LongType);
+    } else if (type instanceof TimestampType) {
+      return new BucketLong(DataTypes.TimestampType);
+    } else if (type instanceof DecimalType) {
+      return new BucketDecimal(((DecimalType) type).precision(), ((DecimalType) type).scale());
+    } else if (type instanceof StringType) {
+      return new BucketString();
+    } else if (type instanceof BinaryType) {
+      return new BucketBinary();
+    } else {
+      throw new UnsupportedOperationException(
+          "Expected bucketed column to be date, tinyint, smallint, int, bigint, decimal, timestamp, string, or binary");
+    }
+  }
+
+  @Override
+  public String description() {
+    return name()
+        + "(numBuckets, col) - Call Iceberg's bucket transform\n"
+        + "  numBuckets :: number of buckets to divide the rows into, e.g. bucket(100, 34) -> 79 (must be a tinyint, smallint, or int)\n"
+        + "  col :: column to bucket (must be a date, integer, long, timestamp, decimal, string, or binary)";
+  }
+
+  @Override
+  public String name() {
+    return "bucket";
+  }
+
+  public abstract static class BucketBase implements ScalarFunction<Integer> {
+    @Override
+    public String name() {
+      return "bucket";
+    }
+
+    @Override
+    public DataType resultType() {
+      return DataTypes.IntegerType;
+    }
+  }
+
+  // Used for both int and date - tinyint and smallint are upcasted to int by Spark.
+  public static class BucketInt extends BucketBase {
+    private final DataType sqlType;
+
+    // magic method used in codegen
+    public static int invoke(int numBuckets, int value) {
+      return (BucketUtil.hashInteger(value) & Integer.MAX_VALUE) % numBuckets;

Review Comment:
   Made a static `apply` method in the base class that accepts the hashed int and the number of buckets.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a diff in pull request #5513: Spark 3.3 - Support bucket in FunctionCatalog

Posted by GitBox <gi...@apache.org>.
kbendick commented on code in PR #5513:
URL: https://github.com/apache/iceberg/pull/5513#discussion_r945207959


##########
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkBucketFunction.java:
##########
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.sql;
+
+import java.nio.charset.StandardCharsets;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.relocated.com.google.common.io.BaseEncoding;
+import org.apache.iceberg.spark.SparkTestBaseWithCatalog;
+import org.apache.spark.sql.AnalysisException;
+import org.assertj.core.api.Assertions;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestSparkBucketFunction extends SparkTestBaseWithCatalog {
+  public TestSparkBucketFunction() {}
+
+  @Before
+  public void useCatalog() {
+    sql("USE %s", catalogName);
+  }
+
+  @Test
+  public void testBucketIntegers() {
+    Assert.assertEquals(
+        "Byte type should bucket similarly to integer",
+        3,
+        scalarSql("SELECT system.bucket(10, 8Y)"));
+    Assert.assertEquals(
+        "Short type should bucket similarly to integer",
+        3,
+        scalarSql("SELECT system.bucket(10, 8S)"));
+    // Integers
+    Assert.assertEquals(3, scalarSql("SELECT system.bucket(10, 8)"));
+    Assert.assertEquals(79, scalarSql("SELECT system.bucket(100, 34)"));
+    Assert.assertNull(scalarSql("SELECT system.bucket(1, CAST(null AS INT))"));
+  }
+
+  @Test
+  public void testBucketDates() {
+    Assert.assertEquals(3, scalarSql("SELECT system.bucket(10, date('1970-01-09'))"));
+    Assert.assertEquals(79, scalarSql("SELECT system.bucket(100, date('1970-02-04'))"));
+    Assert.assertNull(scalarSql("SELECT system.bucket(1, CAST(null AS DATE))"));
+  }
+
+  @Test
+  public void testBucketLong() {
+    Assert.assertEquals(79, scalarSql("SELECT system.bucket(100, 34L)"));
+    Assert.assertEquals(76, scalarSql("SELECT system.bucket(100, 0L)"));
+    Assert.assertEquals(97, scalarSql("SELECT system.bucket(100, -34L)"));
+    Assert.assertEquals(0, scalarSql("SELECT system.bucket(2, -1L)"));
+    Assert.assertNull(scalarSql("SELECT system.bucket(2, CAST(null AS LONG))"));
+  }
+
+  @Test
+  public void testBucketDecimal() {
+    Assert.assertEquals(56, scalarSql("SELECT system.bucket(64, CAST('12.34' as DECIMAL(9, 2)))"));
+    Assert.assertEquals(13, scalarSql("SELECT system.bucket(18, CAST('12.30' as DECIMAL(9, 2)))"));
+    Assert.assertEquals(2, scalarSql("SELECT system.bucket(16, CAST('12.999' as DECIMAL(9, 3)))"));
+    Assert.assertEquals(21, scalarSql("SELECT system.bucket(32, CAST('0.05' as DECIMAL(5, 2)))"));
+    Assert.assertEquals(85, scalarSql("SELECT system.bucket(128, CAST('0.05' as DECIMAL(9, 2)))"));
+    Assert.assertEquals(3, scalarSql("SELECT system.bucket(18, CAST('0.05' as DECIMAL(9, 2)))"));
+
+    Assert.assertNull(
+        "Null input should return null",
+        scalarSql("SELECT system.bucket(2, CAST(null AS decimal))"));
+  }
+
+  @Test
+  public void testBucketTimestamp() {
+    Assert.assertEquals(
+        99, scalarSql("SELECT system.bucket(100, TIMESTAMP '1997-01-01 00:00:00 UTC+00:00')"));
+    Assert.assertEquals(
+        85, scalarSql("SELECT system.bucket(100, TIMESTAMP '1997-01-31 09:26:56 UTC+00:00')"));
+    Assert.assertEquals(
+        62, scalarSql("SELECT system.bucket(100, TIMESTAMP '2022-08-08 00:00:00 UTC+00:00')"));
+    Assert.assertNull(scalarSql("SELECT system.bucket(2, CAST(null AS timestamp))"));
+  }
+
+  @Test
+  public void testBucketString() {
+    Assert.assertEquals(4, scalarSql("SELECT system.bucket(5, 'abcdefg')"));
+    Assert.assertEquals(122, scalarSql("SELECT system.bucket(128, 'abc')"));
+    Assert.assertEquals(54, scalarSql("SELECT system.bucket(64, 'abcde')"));
+    Assert.assertEquals(8, scalarSql("SELECT system.bucket(12, '测试')"));
+    Assert.assertEquals(1, scalarSql("SELECT system.bucket(16, '测试raul试测')"));
+    Assert.assertEquals(
+        "Varchar should work like string",
+        1,
+        scalarSql("SELECT system.bucket(16, CAST('测试raul试测' AS varchar(8)))"));
+    Assert.assertEquals(
+        "Char should work like string",
+        1,
+        scalarSql("SELECT system.bucket(16, CAST('测试raul试测' AS char(8)))"));
+    Assert.assertEquals(
+        "Should not fail on the empty string", 0, scalarSql("SELECT system.bucket(16, '')"));
+    Assert.assertNull(
+        "Null input should return null as output",
+        scalarSql("SELECT system.bucket(16, CAST(null AS string))"));
+  }
+
+  @Test
+  public void testBucketBinary() {
+    Assert.assertEquals(
+        1, scalarSql("SELECT system.bucket(10, X'0102030405060708090a0b0c0d0e0f')"));
+    Assert.assertEquals(10, scalarSql("SELECT system.bucket(12, %s)", asBytesLiteral("abcdefg")));
+    Assert.assertEquals(13, scalarSql("SELECT system.bucket(18, %s)", asBytesLiteral("abc\0\0")));
+    Assert.assertEquals(42, scalarSql("SELECT system.bucket(48, %s)", asBytesLiteral("abc")));
+    Assert.assertEquals(3, scalarSql("SELECT system.bucket(16, %s)", asBytesLiteral("测试_")));
+
+    Assert.assertNull(
+        "Null input should return null as output",
+        scalarSql("SELECT system.bucket(100, CAST(null AS binary))"));
+  }
+
+  @Test
+  public void testNumBucketsAcceptsShortAndByte() {
+    Assert.assertEquals(
+        "Short types should be usable for the number of buckets field",
+        1,
+        scalarSql("SELECT system.bucket(5S, 1L)"));
+
+    Assert.assertEquals(
+        "Byte types should be allowed for the number of buckets field",
+        1,
+        scalarSql("SELECT system.bucket(5Y, 1)"));
+  }
+
+  @Test
+  public void testWrongNumberOfArguments() {
+    AssertHelpers.assertThrows(
+        "Function resolution should not work with zero arguments",
+        AnalysisException.class,
+        "Function 'bucket' cannot process input: (): Wrong number of inputs (expected numBuckets and value)",
+        () -> scalarSql("SELECT system.bucket()"));
+
+    AssertHelpers.assertThrows(
+        "Function resolution should not work with only one argument",
+        AnalysisException.class,
+        "Function 'bucket' cannot process input: (int): Wrong number of inputs (expected numBuckets and value)",
+        () -> scalarSql("SELECT system.bucket(1)"));
+
+    AssertHelpers.assertThrows(
+        "Function resolution should not work with more than two arguments",
+        AnalysisException.class,
+        "Function 'bucket' cannot process input: (int, bigint, int): Wrong number of inputs (expected numBuckets and value)",
+        () -> scalarSql("SELECT system.bucket(1, 1L, 1)"));
+  }
+
+  @Test
+  public void testInvalidTypesCannotBeUsedForNumberOfBuckets() {
+    AssertHelpers.assertThrows(
+        "Decimal type should not be coercible to the number of buckets",
+        AnalysisException.class,
+        "Function 'bucket' cannot process input: (decimal(9,2), int): Expected number of buckets to be tinyint, shortint or int",
+        () -> scalarSql("SELECT system.bucket(CAST('12.34' as DECIMAL(9, 2)), 10)"));
+
+    AssertHelpers.assertThrows(
+        "Long type should not be coercible to the number of buckets",
+        AnalysisException.class,
+        "Function 'bucket' cannot process input: (bigint, int): Expected number of buckets to be tinyint, shortint or int",
+        () -> scalarSql("SELECT system.bucket(12L, 10)"));
+
+    AssertHelpers.assertThrows(
+        "String type should not be coercible to the number of buckets",
+        AnalysisException.class,
+        "Function 'bucket' cannot process input: (string, int): Expected number of buckets to be tinyint, shortint or int",
+        () -> scalarSql("SELECT system.bucket('5', 10)"));
+
+    AssertHelpers.assertThrows(
+        "Interval year to month  type should not be coercible to the number of buckets",
+        AnalysisException.class,
+        "Function 'bucket' cannot process input: (interval year to month, int): Expected number of buckets to be tinyint, shortint or int",
+        () -> scalarSql("SELECT system.bucket(INTERVAL '100-00' YEAR TO MONTH, 10)"));
+
+    AssertHelpers.assertThrows(
+        "Interval day-time type should not be coercible to the number of buckets",
+        AnalysisException.class,
+        "Function 'bucket' cannot process input: (interval day to second, int): Expected number of buckets to be tinyint, shortint or int",
+        () -> scalarSql("SELECT system.bucket(CAST('11 23:4:0' AS INTERVAL DAY TO SECOND), 10)"));
+  }
+
+  @Test
+  public void testInvalidTypesForBucketColumn() {
+    AssertHelpers.assertThrows(
+        "Double type should not be bucketable",
+        AnalysisException.class,
+        "Function 'bucket' cannot process input: (int, float): Expected bucketed column to be date, tinyint, smallint, int, bigint, decimal, timestamp, string, or binary",
+        () -> scalarSql("SELECT system.bucket(10, cast(12.3456 as float))"));
+
+    AssertHelpers.assertThrows(
+        "Double type should not be bucketable",
+        AnalysisException.class,
+        "Function 'bucket' cannot process input: (int, double): Expected bucketed column to be date, tinyint, smallint, int, bigint, decimal, timestamp, string, or binary",
+        () -> scalarSql("SELECT system.bucket(10, cast(12.3456 as double))"));
+
+    AssertHelpers.assertThrows(
+        "Boolean type should not be bucketable",
+        AnalysisException.class,
+        "Function 'bucket' cannot process input: (int, boolean)",
+        () -> scalarSql("SELECT system.bucket(10, true)"));
+
+    AssertHelpers.assertThrows(
+        "Map types should not be bucketable",
+        AnalysisException.class,
+        "Function 'bucket' cannot process input: (int, map<int,int>)",
+        () -> scalarSql("SELECT system.bucket(10, map(1, 1))"));
+
+    AssertHelpers.assertThrows(
+        "Array types should not be bucketable",
+        AnalysisException.class,
+        "Function 'bucket' cannot process input: (int, array<bigint>)",
+        () -> scalarSql("SELECT system.bucket(10, array(1L))"));
+
+    AssertHelpers.assertThrows(
+        "Interval year-to-month type should not be bucketable",
+        AnalysisException.class,
+        "Function 'bucket' cannot process input: (int, interval year to month)",
+        () -> scalarSql("SELECT system.bucket(10, INTERVAL '100-00' YEAR TO MONTH)"));
+
+    AssertHelpers.assertThrows(
+        "Interval day-time type should not be bucketable",
+        AnalysisException.class,
+        "Function 'bucket' cannot process input: (int, interval day to second)",
+        () -> scalarSql("SELECT system.bucket(10, CAST('11 23:4:0' AS INTERVAL DAY TO SECOND))"));
+  }
+
+  @Test
+  public void testMagicFunctionsResolveForTinyIntAndSmallIntNumberOfBuckets() {
+    // Magic functions have staticinvoke in the explain output.
+    // Nonmagic calls use applyfunctionexpression instead and go through produceResults.
+    String tinyIntWidthExplain = (String) scalarSql("EXPLAIN EXTENDED SELECT system.bucket(1Y, 6)");
+    Assertions.assertThat(tinyIntWidthExplain)
+        .contains("cast(1 as int)")
+        .contains("staticinvoke(class org.apache.iceberg.spark.functions.BucketFunction$BucketInt");

Review Comment:
   I'll make sure there are tests for the result when using smallint / tinyint for the width.
   
   This test is just to ensure that we don't need to add classes that specifically accept smallint / tinyint to get the benefit of the magic function.
   
   Given I have verified that though, I can remove these as the explain output is quite likely to change with different Spark versions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5513: Spark 3.3 - Support bucket in FunctionCatalog

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5513:
URL: https://github.com/apache/iceberg/pull/5513#discussion_r945205675


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/functions/BucketFunction.java:
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.functions;
+
+import java.nio.ByteBuffer;
+import java.util.Set;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.util.BucketUtil;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.functions.BoundFunction;
+import org.apache.spark.sql.connector.catalog.functions.ScalarFunction;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
+import org.apache.spark.sql.types.BinaryType;
+import org.apache.spark.sql.types.ByteType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.DateType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.sql.types.LongType;
+import org.apache.spark.sql.types.ShortType;
+import org.apache.spark.sql.types.StringType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.TimestampType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A Spark function implementation for the Iceberg bucket transform.
+ *
+ * <p>Example usage: {@code SELECT system.bucket(128, 'abc')}, which returns the bucket 122.
+ *
+ * <p>Note that for performance reasons, the given input number of buckets is not validated in the
+ * implementations used in code-gen. The number of buckets must be positive to give meaningful
+ * results.
+ */
+public class BucketFunction implements UnboundFunction {
+  private static final int NUM_BUCKETS_ORDINAL = 0;
+  private static final int VALUE_ORDINAL = 1;
+  private static final Set<DataType> SUPPORTED_NUM_BUCKETS_TYPES =
+      ImmutableSet.of(DataTypes.ByteType, DataTypes.ShortType, DataTypes.IntegerType);
+
+  @Override
+  public BoundFunction bind(StructType inputType) {
+    if (inputType.size() != 2) {
+      throw new UnsupportedOperationException(
+          "Wrong number of inputs (expected numBuckets and value)");
+    }
+
+    StructField numBucketsField = inputType.fields()[NUM_BUCKETS_ORDINAL];
+    StructField valueField = inputType.fields()[VALUE_ORDINAL];
+
+    if (!SUPPORTED_NUM_BUCKETS_TYPES.contains(numBucketsField.dataType())) {
+      throw new UnsupportedOperationException(
+          "Expected number of buckets to be tinyint, shortint or int");
+    }
+
+    // TODO - Support UUID type.
+    DataType type = valueField.dataType();
+    if (type instanceof DateType) {
+      return new BucketInt(DataTypes.DateType);
+    } else if (type instanceof ByteType
+        || type instanceof ShortType
+        || type instanceof IntegerType) {
+      return new BucketInt(DataTypes.IntegerType);
+    } else if (type instanceof LongType) {
+      return new BucketLong(DataTypes.LongType);
+    } else if (type instanceof TimestampType) {
+      return new BucketLong(DataTypes.TimestampType);
+    } else if (type instanceof DecimalType) {
+      return new BucketDecimal(((DecimalType) type).precision(), ((DecimalType) type).scale());
+    } else if (type instanceof StringType) {
+      return new BucketString();
+    } else if (type instanceof BinaryType) {
+      return new BucketBinary();
+    } else {
+      throw new UnsupportedOperationException(
+          "Expected bucketed column to be date, tinyint, smallint, int, bigint, decimal, timestamp, string, or binary");
+    }
+  }
+
+  @Override
+  public String description() {
+    return name()
+        + "(numBuckets, col) - Call Iceberg's bucket transform\n"
+        + "  numBuckets :: number of buckets to divide the rows into, e.g. bucket(100, 34) -> 79 (must be a tinyint, smallint, or int)\n"
+        + "  col :: column to bucket (must be a date, integer, long, timestamp, decimal, string, or binary)";
+  }
+
+  @Override
+  public String name() {
+    return "bucket";
+  }
+
+  public abstract static class BucketBase implements ScalarFunction<Integer> {
+    @Override
+    public String name() {
+      return "bucket";
+    }
+
+    @Override
+    public DataType resultType() {
+      return DataTypes.IntegerType;
+    }
+  }
+
+  // Used for both int and date - tinyint and smallint are upcasted to int by Spark.
+  public static class BucketInt extends BucketBase {
+    private final DataType sqlType;
+
+    // magic method used in codegen
+    public static int invoke(int numBuckets, int value) {
+      return (BucketUtil.hashInteger(value) & Integer.MAX_VALUE) % numBuckets;
+    }
+
+    public BucketInt(DataType sqlType) {
+      this.sqlType = sqlType;
+    }
+
+    @Override
+    public DataType[] inputTypes() {
+      return new DataType[] {DataTypes.IntegerType, sqlType};

Review Comment:
   Won't this prevent the input from being cast to an int by Spark? In that case, the `invoke` function won't be called correctly?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5513: Spark 3.3 - Support bucket in FunctionCatalog

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5513:
URL: https://github.com/apache/iceberg/pull/5513#discussion_r945205142


##########
api/src/main/java/org/apache/iceberg/util/BucketUtil.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.util;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.UUID;
+import org.apache.iceberg.relocated.com.google.common.hash.HashFunction;
+import org.apache.iceberg.relocated.com.google.common.hash.Hashing;
+
+/**
+ * Contains the logic for hashing various types for use with the {@code bucket} partition
+ * transformations
+ */
+public class BucketUtil {
+
+  private static final HashFunction MURMUR3 = Hashing.murmur3_32_fixed();
+
+  private BucketUtil() {}
+
+  public static int hashInteger(Integer value) {
+    return MURMUR3.hashLong(value.longValue()).asInt();
+  }
+
+  public static int hashLong(Long value) {
+    return MURMUR3.hashLong(value).asInt();
+  }
+
+  public static int hashFloat(Float value) {
+    return MURMUR3.hashLong(Double.doubleToLongBits((double) value)).asInt();
+  }
+
+  public static int hashDouble(Double value) {
+    return MURMUR3.hashLong(Double.doubleToLongBits(value)).asInt();
+  }
+
+  public static int hashCharSequence(CharSequence value) {
+    return MURMUR3.hashString(value, StandardCharsets.UTF_8).asInt();
+  }
+
+  public static int hashByteBuffer(ByteBuffer value) {
+    if (value.hasArray()) {
+      return MURMUR3
+          .hashBytes(
+              value.array(),
+              value.arrayOffset() + value.position(),
+              value.arrayOffset() + value.remaining())

Review Comment:
   I just realized that this isn't correct. It has been wrong for years, evidently.
   
   [`HashFunction.hashBytes`](https://guava.dev/releases/19.0/api/docs/com/google/common/hash/HashFunction.html#hashBytes(byte[],%20int,%20int)) accepts a length. `value.remaining()` is that length.
   
   Looks like this was working because `arrayOffset` was never non-zero. That's because the only way to get a `ByteBuffer` with a non-zero `arrayOffset` (as far as I can tell) is to use `ByteBuffer.slice()`, which creates a copy of the `ByteBuffer` and sets it. Since `slice` doesn't allow setting the `position` and `limit`, everywhere that I've been able to find uses `duplicate()` and then sets `position` and `limit` because there's no need to limit the start or capacity of the `ByteBuffer` when the backing array is not limited. Allocation and wrapping byte arrays always produces `arrayOffset=0`.
   
   Here's a test that catches this. @kbendick, can you add this to `TestBucketing` along with the fix here (remove `value.arrayOffset()`)?
   
   ```java
     @Test
     public void testByteBufferOnHeapArrayOffset() {
       byte[] bytes = randomBytes(128);
       ByteBuffer raw = ByteBuffer.wrap(bytes, 5, 100);
       ByteBuffer buffer = raw.slice();
       Assert.assertEquals("Buffer arrayOffset should be 5", 5, buffer.arrayOffset());
   
       Bucket<ByteBuffer> bucketFunc = Bucket.get(Types.BinaryType.get(), 100);
   
       Assert.assertEquals(
           "HeapByteBuffer hash should match hash for correct slice",
           hashBytes(bytes, 5, 100),
           bucketFunc.hash(buffer));
   
       // verify that the buffer was not modified
       Assert.assertEquals("Buffer position should be 0", 0, buffer.position());
       Assert.assertEquals("Buffer limit should not change", 100, buffer.limit());
     }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a diff in pull request #5513: Spark 3.3 - Support bucket in FunctionCatalog

Posted by GitBox <gi...@apache.org>.
kbendick commented on code in PR #5513:
URL: https://github.com/apache/iceberg/pull/5513#discussion_r945207053


##########
api/src/main/java/org/apache/iceberg/util/BucketUtil.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.util;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.UUID;
+import org.apache.iceberg.relocated.com.google.common.hash.HashFunction;
+import org.apache.iceberg.relocated.com.google.common.hash.Hashing;
+
+/**
+ * Contains the logic for hashing various types for use with the {@code bucket} partition
+ * transformations
+ */
+public class BucketUtil {
+
+  private static final HashFunction MURMUR3 = Hashing.murmur3_32_fixed();
+
+  private BucketUtil() {}
+
+  public static int hashInteger(Integer value) {
+    return MURMUR3.hashLong(value.longValue()).asInt();
+  }
+
+  public static int hashLong(Long value) {
+    return MURMUR3.hashLong(value).asInt();
+  }
+
+  public static int hashFloat(Float value) {
+    return MURMUR3.hashLong(Double.doubleToLongBits((double) value)).asInt();
+  }
+
+  public static int hashDouble(Double value) {
+    return MURMUR3.hashLong(Double.doubleToLongBits(value)).asInt();
+  }
+
+  public static int hashCharSequence(CharSequence value) {
+    return MURMUR3.hashString(value, StandardCharsets.UTF_8).asInt();
+  }
+
+  public static int hashByteBuffer(ByteBuffer value) {
+    if (value.hasArray()) {
+      return MURMUR3
+          .hashBytes(
+              value.array(),
+              value.arrayOffset() + value.position(),
+              value.arrayOffset() + value.remaining())

Review Comment:
   Yeah absolutely I'll add this test this evening.
   
   I was potentially going to use a different hash function for the byte[] that Spark passes (or investigate it at least), but I'll make this change this evening!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a diff in pull request #5513: Spark 3.3 - Support bucket in FunctionCatalog

Posted by GitBox <gi...@apache.org>.
kbendick commented on code in PR #5513:
URL: https://github.com/apache/iceberg/pull/5513#discussion_r946013296


##########
api/src/main/java/org/apache/iceberg/util/BucketUtil.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.util;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.UUID;
+import org.apache.iceberg.relocated.com.google.common.hash.HashFunction;
+import org.apache.iceberg.relocated.com.google.common.hash.Hashing;
+
+/**
+ * Contains the logic for hashing various types for use with the {@code bucket} partition
+ * transformations
+ */
+public class BucketUtil {
+
+  private static final HashFunction MURMUR3 = Hashing.murmur3_32_fixed();
+
+  private BucketUtil() {}
+
+  public static int hashInteger(Integer value) {
+    return MURMUR3.hashLong(value.longValue()).asInt();
+  }
+
+  public static int hashLong(Long value) {
+    return MURMUR3.hashLong(value).asInt();
+  }
+
+  public static int hashFloat(Float value) {
+    return MURMUR3.hashLong(Double.doubleToLongBits((double) value)).asInt();
+  }
+
+  public static int hashDouble(Double value) {
+    return MURMUR3.hashLong(Double.doubleToLongBits(value)).asInt();
+  }
+
+  public static int hashCharSequence(CharSequence value) {
+    return MURMUR3.hashString(value, StandardCharsets.UTF_8).asInt();
+  }
+
+  public static int hashByteBuffer(ByteBuffer value) {
+    if (value.hasArray()) {
+      return MURMUR3
+          .hashBytes(
+              value.array(),
+              value.arrayOffset() + value.position(),
+              value.arrayOffset() + value.remaining())

Review Comment:
   Added the fix and the test.
   
   I can put it in a separate PR if we think that would be better for people who cherry-pick.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a diff in pull request #5513: Spark 3.3 - Support bucket in FunctionCatalog

Posted by GitBox <gi...@apache.org>.
kbendick commented on code in PR #5513:
URL: https://github.com/apache/iceberg/pull/5513#discussion_r946109646


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/functions/BucketFunction.java:
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.functions;
+
+import java.nio.ByteBuffer;
+import java.util.Set;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.util.BucketUtil;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.functions.BoundFunction;
+import org.apache.spark.sql.connector.catalog.functions.ScalarFunction;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
+import org.apache.spark.sql.types.BinaryType;
+import org.apache.spark.sql.types.ByteType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.DateType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.sql.types.LongType;
+import org.apache.spark.sql.types.ShortType;
+import org.apache.spark.sql.types.StringType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.TimestampType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A Spark function implementation for the Iceberg bucket transform.
+ *
+ * <p>Example usage: {@code SELECT system.bucket(128, 'abc')}, which returns the bucket 122.
+ *
+ * <p>Note that for performance reasons, the given input number of buckets is not validated in the
+ * implementations used in code-gen. The number of buckets must be positive to give meaningful
+ * results.
+ */
+public class BucketFunction implements UnboundFunction {
+  private static final int NUM_BUCKETS_ORDINAL = 0;
+  private static final int VALUE_ORDINAL = 1;
+  private static final Set<DataType> SUPPORTED_NUM_BUCKETS_TYPES =
+      ImmutableSet.of(DataTypes.ByteType, DataTypes.ShortType, DataTypes.IntegerType);
+
+  @Override
+  public BoundFunction bind(StructType inputType) {
+    if (inputType.size() != 2) {
+      throw new UnsupportedOperationException(
+          "Wrong number of inputs (expected numBuckets and value)");
+    }
+
+    StructField numBucketsField = inputType.fields()[NUM_BUCKETS_ORDINAL];
+    StructField valueField = inputType.fields()[VALUE_ORDINAL];
+
+    if (!SUPPORTED_NUM_BUCKETS_TYPES.contains(numBucketsField.dataType())) {
+      throw new UnsupportedOperationException(
+          "Expected number of buckets to be tinyint, shortint or int");
+    }
+
+    // TODO - Support UUID type.
+    DataType type = valueField.dataType();
+    if (type instanceof DateType) {
+      return new BucketInt(DataTypes.DateType);
+    } else if (type instanceof ByteType
+        || type instanceof ShortType
+        || type instanceof IntegerType) {
+      return new BucketInt(DataTypes.IntegerType);
+    } else if (type instanceof LongType) {
+      return new BucketLong(DataTypes.LongType);
+    } else if (type instanceof TimestampType) {
+      return new BucketLong(DataTypes.TimestampType);
+    } else if (type instanceof DecimalType) {
+      return new BucketDecimal(((DecimalType) type).precision(), ((DecimalType) type).scale());
+    } else if (type instanceof StringType) {
+      return new BucketString();
+    } else if (type instanceof BinaryType) {
+      return new BucketBinary();
+    } else {
+      throw new UnsupportedOperationException(
+          "Expected bucketed column to be date, tinyint, smallint, int, bigint, decimal, timestamp, string, or binary");
+    }
+  }
+
+  @Override
+  public String description() {
+    return name()
+        + "(numBuckets, col) - Call Iceberg's bucket transform\n"
+        + "  numBuckets :: number of buckets to divide the rows into, e.g. bucket(100, 34) -> 79 (must be a tinyint, smallint, or int)\n"
+        + "  col :: column to bucket (must be a date, integer, long, timestamp, decimal, string, or binary)";
+  }
+
+  @Override
+  public String name() {
+    return "bucket";
+  }
+
+  public abstract static class BucketBase implements ScalarFunction<Integer> {
+    @Override
+    public String name() {
+      return "bucket";
+    }
+
+    @Override
+    public DataType resultType() {
+      return DataTypes.IntegerType;
+    }
+  }
+
+  // Used for both int and date - tinyint and smallint are upcasted to int by Spark.
+  public static class BucketInt extends BucketBase {
+    private final DataType sqlType;
+
+    // magic method used in codegen
+    public static int invoke(int numBuckets, int value) {
+      return (BucketUtil.hashInteger(value) & Integer.MAX_VALUE) % numBuckets;

Review Comment:
   Made an `apply` method in the base class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a diff in pull request #5513: Spark 3.3 - Support bucket in FunctionCatalog

Posted by GitBox <gi...@apache.org>.
kbendick commented on code in PR #5513:
URL: https://github.com/apache/iceberg/pull/5513#discussion_r946036575


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/functions/BucketFunction.java:
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.functions;
+
+import java.nio.ByteBuffer;
+import java.util.Set;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.util.BucketUtil;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.functions.BoundFunction;
+import org.apache.spark.sql.connector.catalog.functions.ScalarFunction;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
+import org.apache.spark.sql.types.BinaryType;
+import org.apache.spark.sql.types.ByteType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.DateType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.sql.types.LongType;
+import org.apache.spark.sql.types.ShortType;
+import org.apache.spark.sql.types.StringType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.TimestampType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A Spark function implementation for the Iceberg bucket transform.
+ *
+ * <p>Example usage: {@code SELECT system.bucket(128, 'abc')}, which returns the bucket 122.
+ *
+ * <p>Note that for performance reasons, the given input number of buckets is not validated in the
+ * implementations used in code-gen. The number of buckets must be positive to give meaningful
+ * results.
+ */
+public class BucketFunction implements UnboundFunction {
+  private static final int NUM_BUCKETS_ORDINAL = 0;
+  private static final int VALUE_ORDINAL = 1;
+  private static final Set<DataType> SUPPORTED_NUM_BUCKETS_TYPES =
+      ImmutableSet.of(DataTypes.ByteType, DataTypes.ShortType, DataTypes.IntegerType);
+
+  @Override
+  public BoundFunction bind(StructType inputType) {
+    if (inputType.size() != 2) {
+      throw new UnsupportedOperationException(
+          "Wrong number of inputs (expected numBuckets and value)");
+    }
+
+    StructField numBucketsField = inputType.fields()[NUM_BUCKETS_ORDINAL];
+    StructField valueField = inputType.fields()[VALUE_ORDINAL];
+
+    if (!SUPPORTED_NUM_BUCKETS_TYPES.contains(numBucketsField.dataType())) {
+      throw new UnsupportedOperationException(
+          "Expected number of buckets to be tinyint, shortint or int");
+    }
+
+    // TODO - Support UUID type.
+    DataType type = valueField.dataType();
+    if (type instanceof DateType) {
+      return new BucketInt(DataTypes.DateType);
+    } else if (type instanceof ByteType
+        || type instanceof ShortType
+        || type instanceof IntegerType) {
+      return new BucketInt(DataTypes.IntegerType);
+    } else if (type instanceof LongType) {
+      return new BucketLong(DataTypes.LongType);
+    } else if (type instanceof TimestampType) {
+      return new BucketLong(DataTypes.TimestampType);
+    } else if (type instanceof DecimalType) {
+      return new BucketDecimal(((DecimalType) type).precision(), ((DecimalType) type).scale());
+    } else if (type instanceof StringType) {
+      return new BucketString();
+    } else if (type instanceof BinaryType) {
+      return new BucketBinary();
+    } else {
+      throw new UnsupportedOperationException(
+          "Expected bucketed column to be date, tinyint, smallint, int, bigint, decimal, timestamp, string, or binary");
+    }
+  }
+
+  @Override
+  public String description() {
+    return name()
+        + "(numBuckets, col) - Call Iceberg's bucket transform\n"
+        + "  numBuckets :: number of buckets to divide the rows into, e.g. bucket(100, 34) -> 79 (must be a tinyint, smallint, or int)\n"
+        + "  col :: column to bucket (must be a date, integer, long, timestamp, decimal, string, or binary)";
+  }
+
+  @Override
+  public String name() {
+    return "bucket";
+  }
+
+  public abstract static class BucketBase implements ScalarFunction<Integer> {
+    @Override
+    public String name() {
+      return "bucket";
+    }
+
+    @Override
+    public DataType resultType() {
+      return DataTypes.IntegerType;
+    }
+  }
+
+  // Used for both int and date - tinyint and smallint are upcasted to int by Spark.
+  public static class BucketInt extends BucketBase {
+    private final DataType sqlType;
+
+    // magic method used in codegen
+    public static int invoke(int numBuckets, int value) {
+      return (BucketUtil.hashInteger(value) & Integer.MAX_VALUE) % numBuckets;
+    }
+
+    public BucketInt(DataType sqlType) {
+      this.sqlType = sqlType;
+    }
+
+    @Override
+    public DataType[] inputTypes() {
+      return new DataType[] {DataTypes.IntegerType, sqlType};
+    }
+
+    @Override
+    public String canonicalName() {
+      return String.format("iceberg.bucket(%s)", sqlType.catalogString());
+    }
+
+    @Override
+    public Integer produceResult(InternalRow input) {
+      // return null for null input to match what Spark does in the code-generated versions.
+      return input.isNullAt(NUM_BUCKETS_ORDINAL) || input.isNullAt(VALUE_ORDINAL)
+          ? null
+          : invoke(input.getInt(NUM_BUCKETS_ORDINAL), input.getInt(VALUE_ORDINAL));
+    }
+  }
+
+  // Used for both BigInt and Timestamp
+  public static class BucketLong extends BucketBase {
+    private final DataType sqlType;
+
+    // magic function for usage with codegen - needs to be static
+    public static int invoke(int numBuckets, long value) {
+      return (BucketUtil.hashLong(value) & Integer.MAX_VALUE) % numBuckets;
+    }
+
+    public BucketLong(DataType sqlType) {
+      this.sqlType = sqlType;
+    }
+
+    @Override
+    public DataType[] inputTypes() {
+      return new DataType[] {DataTypes.IntegerType, sqlType};
+    }
+
+    @Override
+    public String canonicalName() {
+      return String.format("iceberg.bucket(%s)", sqlType.catalogString());
+    }
+
+    @Override
+    public Integer produceResult(InternalRow input) {
+      return input.isNullAt(NUM_BUCKETS_ORDINAL) || input.isNullAt(VALUE_ORDINAL)
+          ? null
+          : invoke(input.getInt(NUM_BUCKETS_ORDINAL), input.getLong(VALUE_ORDINAL));
+    }
+  }
+
+  // bucketing by Float is not allowed by the spec, but this has the float hash implementation
+  public static class BucketFloat extends BucketBase {
+
+    public static int invoke(int numBuckets, float value) {
+      return (BucketUtil.hashFloat(value) & Integer.MAX_VALUE) % numBuckets;
+    }
+
+    @Override
+    public DataType[] inputTypes() {
+      return new DataType[] {DataTypes.IntegerType, DataTypes.FloatType};
+    }
+
+    @Override
+    public String canonicalName() {
+      return "iceberg.bucket(float)";
+    }
+
+    @Override
+    public Integer produceResult(InternalRow input) {
+      return input.isNullAt(NUM_BUCKETS_ORDINAL) || input.isNullAt(VALUE_ORDINAL)
+          ? null
+          : invoke(input.getInt(NUM_BUCKETS_ORDINAL), input.getFloat(VALUE_ORDINAL));
+    }
+  }
+
+  public static class BucketString extends BucketBase {
+    // magic function for usage with codegen
+    public static Integer invoke(int numBuckets, UTF8String value) {
+      if (value == null) {
+        return null;
+      }
+
+      return (BucketUtil.hashCharSequence(value.toString()) & Integer.MAX_VALUE) % numBuckets;
+    }
+
+    @Override
+    public DataType[] inputTypes() {
+      return new DataType[] {DataTypes.IntegerType, DataTypes.StringType};
+    }
+
+    @Override
+    public String canonicalName() {
+      return "iceberg.bucket(string)";
+    }
+
+    @Override
+    public Integer produceResult(InternalRow input) {
+      return input.isNullAt(NUM_BUCKETS_ORDINAL) || input.isNullAt(VALUE_ORDINAL)
+          ? null
+          : invoke(input.getInt(NUM_BUCKETS_ORDINAL), input.getUTF8String(VALUE_ORDINAL));

Review Comment:
   Actually I think we wound up keeping the two null checks, because we wanted to avoid calling `input.getInt` etc on non-null columns.
   
   I can remove it from just the few that don't access a primitive type via the specialized getters, but I like the consistency. This code is only used when code generation is disabled anyways, so it's relatively minor so I could go either way. But we did keep the check for all types in the end in the other PR for `TruncateFunction`: https://github.com/apache/iceberg/blob/ce5128f09cc697455e76af08ce6ce3c9c5b08b70/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/functions/TruncateFunction.java#L267-L274
   
   I can remove it here and there or just keep it consistent throughout. I prefer consistent throughout though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5513: Spark 3.3 - Support bucket in FunctionCatalog

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5513:
URL: https://github.com/apache/iceberg/pull/5513#discussion_r945205832


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/functions/BucketFunction.java:
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.functions;
+
+import java.nio.ByteBuffer;
+import java.util.Set;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.util.BucketUtil;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.functions.BoundFunction;
+import org.apache.spark.sql.connector.catalog.functions.ScalarFunction;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
+import org.apache.spark.sql.types.BinaryType;
+import org.apache.spark.sql.types.ByteType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.DateType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.sql.types.LongType;
+import org.apache.spark.sql.types.ShortType;
+import org.apache.spark.sql.types.StringType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.TimestampType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A Spark function implementation for the Iceberg bucket transform.
+ *
+ * <p>Example usage: {@code SELECT system.bucket(128, 'abc')}, which returns the bucket 122.
+ *
+ * <p>Note that for performance reasons, the given input number of buckets is not validated in the
+ * implementations used in code-gen. The number of buckets must be positive to give meaningful
+ * results.
+ */
+public class BucketFunction implements UnboundFunction {
+  private static final int NUM_BUCKETS_ORDINAL = 0;
+  private static final int VALUE_ORDINAL = 1;
+  private static final Set<DataType> SUPPORTED_NUM_BUCKETS_TYPES =
+      ImmutableSet.of(DataTypes.ByteType, DataTypes.ShortType, DataTypes.IntegerType);
+
+  @Override
+  public BoundFunction bind(StructType inputType) {
+    if (inputType.size() != 2) {
+      throw new UnsupportedOperationException(
+          "Wrong number of inputs (expected numBuckets and value)");
+    }
+
+    StructField numBucketsField = inputType.fields()[NUM_BUCKETS_ORDINAL];
+    StructField valueField = inputType.fields()[VALUE_ORDINAL];
+
+    if (!SUPPORTED_NUM_BUCKETS_TYPES.contains(numBucketsField.dataType())) {
+      throw new UnsupportedOperationException(
+          "Expected number of buckets to be tinyint, shortint or int");
+    }
+
+    // TODO - Support UUID type.
+    DataType type = valueField.dataType();
+    if (type instanceof DateType) {
+      return new BucketInt(DataTypes.DateType);
+    } else if (type instanceof ByteType
+        || type instanceof ShortType
+        || type instanceof IntegerType) {
+      return new BucketInt(DataTypes.IntegerType);
+    } else if (type instanceof LongType) {
+      return new BucketLong(DataTypes.LongType);
+    } else if (type instanceof TimestampType) {
+      return new BucketLong(DataTypes.TimestampType);
+    } else if (type instanceof DecimalType) {
+      return new BucketDecimal(((DecimalType) type).precision(), ((DecimalType) type).scale());
+    } else if (type instanceof StringType) {
+      return new BucketString();
+    } else if (type instanceof BinaryType) {
+      return new BucketBinary();
+    } else {
+      throw new UnsupportedOperationException(
+          "Expected bucketed column to be date, tinyint, smallint, int, bigint, decimal, timestamp, string, or binary");
+    }
+  }
+
+  @Override
+  public String description() {
+    return name()
+        + "(numBuckets, col) - Call Iceberg's bucket transform\n"
+        + "  numBuckets :: number of buckets to divide the rows into, e.g. bucket(100, 34) -> 79 (must be a tinyint, smallint, or int)\n"
+        + "  col :: column to bucket (must be a date, integer, long, timestamp, decimal, string, or binary)";
+  }
+
+  @Override
+  public String name() {
+    return "bucket";
+  }
+
+  public abstract static class BucketBase implements ScalarFunction<Integer> {
+    @Override
+    public String name() {
+      return "bucket";
+    }
+
+    @Override
+    public DataType resultType() {
+      return DataTypes.IntegerType;
+    }
+  }
+
+  // Used for both int and date - tinyint and smallint are upcasted to int by Spark.
+  public static class BucketInt extends BucketBase {
+    private final DataType sqlType;
+
+    // magic method used in codegen
+    public static int invoke(int numBuckets, int value) {
+      return (BucketUtil.hashInteger(value) & Integer.MAX_VALUE) % numBuckets;
+    }
+
+    public BucketInt(DataType sqlType) {
+      this.sqlType = sqlType;
+    }
+
+    @Override
+    public DataType[] inputTypes() {
+      return new DataType[] {DataTypes.IntegerType, sqlType};

Review Comment:
   Oh, I see. This is for `DateType`. Looks fine to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5513: Spark 3.3 - Support bucket in FunctionCatalog

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5513:
URL: https://github.com/apache/iceberg/pull/5513#discussion_r945205343


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/functions/BucketFunction.java:
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.functions;
+
+import java.nio.ByteBuffer;
+import java.util.Set;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.util.BucketUtil;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.functions.BoundFunction;
+import org.apache.spark.sql.connector.catalog.functions.ScalarFunction;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
+import org.apache.spark.sql.types.BinaryType;
+import org.apache.spark.sql.types.ByteType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.DateType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.sql.types.LongType;
+import org.apache.spark.sql.types.ShortType;
+import org.apache.spark.sql.types.StringType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.TimestampType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A Spark function implementation for the Iceberg bucket transform.
+ *
+ * <p>Example usage: {@code SELECT system.bucket(128, 'abc')}, which returns the bucket 122.
+ *
+ * <p>Note that for performance reasons, the given input number of buckets is not validated in the
+ * implementations used in code-gen. The number of buckets must be positive to give meaningful
+ * results.
+ */
+public class BucketFunction implements UnboundFunction {
+  private static final int NUM_BUCKETS_ORDINAL = 0;
+  private static final int VALUE_ORDINAL = 1;
+  private static final Set<DataType> SUPPORTED_NUM_BUCKETS_TYPES =
+      ImmutableSet.of(DataTypes.ByteType, DataTypes.ShortType, DataTypes.IntegerType);
+
+  @Override
+  public BoundFunction bind(StructType inputType) {
+    if (inputType.size() != 2) {
+      throw new UnsupportedOperationException(
+          "Wrong number of inputs (expected numBuckets and value)");
+    }
+
+    StructField numBucketsField = inputType.fields()[NUM_BUCKETS_ORDINAL];
+    StructField valueField = inputType.fields()[VALUE_ORDINAL];
+
+    if (!SUPPORTED_NUM_BUCKETS_TYPES.contains(numBucketsField.dataType())) {
+      throw new UnsupportedOperationException(
+          "Expected number of buckets to be tinyint, shortint or int");
+    }
+
+    // TODO - Support UUID type.
+    DataType type = valueField.dataType();
+    if (type instanceof DateType) {
+      return new BucketInt(DataTypes.DateType);
+    } else if (type instanceof ByteType
+        || type instanceof ShortType
+        || type instanceof IntegerType) {
+      return new BucketInt(DataTypes.IntegerType);
+    } else if (type instanceof LongType) {
+      return new BucketLong(DataTypes.LongType);
+    } else if (type instanceof TimestampType) {
+      return new BucketLong(DataTypes.TimestampType);
+    } else if (type instanceof DecimalType) {
+      return new BucketDecimal(((DecimalType) type).precision(), ((DecimalType) type).scale());
+    } else if (type instanceof StringType) {
+      return new BucketString();
+    } else if (type instanceof BinaryType) {
+      return new BucketBinary();
+    } else {
+      throw new UnsupportedOperationException(
+          "Expected bucketed column to be date, tinyint, smallint, int, bigint, decimal, timestamp, string, or binary");

Review Comment:
   I think it is confusing to say "bucketed column" here. How about just "column"?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5513: Spark 3.3 - Support bucket in FunctionCatalog

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5513:
URL: https://github.com/apache/iceberg/pull/5513#discussion_r945206238


##########
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkBucketFunction.java:
##########
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.sql;
+
+import java.nio.charset.StandardCharsets;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.relocated.com.google.common.io.BaseEncoding;
+import org.apache.iceberg.spark.SparkTestBaseWithCatalog;
+import org.apache.spark.sql.AnalysisException;
+import org.assertj.core.api.Assertions;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestSparkBucketFunction extends SparkTestBaseWithCatalog {
+  public TestSparkBucketFunction() {}
+
+  @Before
+  public void useCatalog() {
+    sql("USE %s", catalogName);
+  }
+
+  @Test
+  public void testBucketIntegers() {
+    Assert.assertEquals(
+        "Byte type should bucket similarly to integer",
+        3,

Review Comment:
   I would probably validate the result against the transform result. That seems safer to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue merged pull request #5513: Spark 3.3 - Support bucket in FunctionCatalog

Posted by GitBox <gi...@apache.org>.
rdblue merged PR #5513:
URL: https://github.com/apache/iceberg/pull/5513


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5513: Spark 3.3 - Support bucket in FunctionCatalog

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5513:
URL: https://github.com/apache/iceberg/pull/5513#discussion_r945205282


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/functions/BucketFunction.java:
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.functions;
+
+import java.nio.ByteBuffer;
+import java.util.Set;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.util.BucketUtil;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.functions.BoundFunction;
+import org.apache.spark.sql.connector.catalog.functions.ScalarFunction;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
+import org.apache.spark.sql.types.BinaryType;
+import org.apache.spark.sql.types.ByteType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.DateType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.sql.types.LongType;
+import org.apache.spark.sql.types.ShortType;
+import org.apache.spark.sql.types.StringType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.TimestampType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A Spark function implementation for the Iceberg bucket transform.
+ *
+ * <p>Example usage: {@code SELECT system.bucket(128, 'abc')}, which returns the bucket 122.
+ *
+ * <p>Note that for performance reasons, the given input number of buckets is not validated in the
+ * implementations used in code-gen. The number of buckets must be positive to give meaningful
+ * results.
+ */
+public class BucketFunction implements UnboundFunction {
+  private static final int NUM_BUCKETS_ORDINAL = 0;
+  private static final int VALUE_ORDINAL = 1;
+  private static final Set<DataType> SUPPORTED_NUM_BUCKETS_TYPES =
+      ImmutableSet.of(DataTypes.ByteType, DataTypes.ShortType, DataTypes.IntegerType);
+
+  @Override
+  public BoundFunction bind(StructType inputType) {
+    if (inputType.size() != 2) {
+      throw new UnsupportedOperationException(
+          "Wrong number of inputs (expected numBuckets and value)");
+    }
+
+    StructField numBucketsField = inputType.fields()[NUM_BUCKETS_ORDINAL];
+    StructField valueField = inputType.fields()[VALUE_ORDINAL];
+
+    if (!SUPPORTED_NUM_BUCKETS_TYPES.contains(numBucketsField.dataType())) {
+      throw new UnsupportedOperationException(
+          "Expected number of buckets to be tinyint, shortint or int");
+    }
+
+    // TODO - Support UUID type.

Review Comment:
   There is no UUID type in Spark, so I don't think there is a way to expose this. You can probably remove the TODO.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a diff in pull request #5513: Spark 3.3 - Support bucket in FunctionCatalog

Posted by GitBox <gi...@apache.org>.
kbendick commented on code in PR #5513:
URL: https://github.com/apache/iceberg/pull/5513#discussion_r945207135


##########
api/src/main/java/org/apache/iceberg/util/BucketUtil.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.util;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.UUID;
+import org.apache.iceberg.relocated.com.google.common.hash.HashFunction;
+import org.apache.iceberg.relocated.com.google.common.hash.Hashing;
+
+/**
+ * Contains the logic for hashing various types for use with the {@code bucket} partition
+ * transformations
+ */
+public class BucketUtil {
+
+  private static final HashFunction MURMUR3 = Hashing.murmur3_32_fixed();
+
+  private BucketUtil() {}
+
+  public static int hashInteger(Integer value) {

Review Comment:
   Good catch on both fronts.
   
   For the boxed types, I believe this is a hold over from a previous design when I had the hash function on the Spark subclasses as an abstract function (which didn't work). Will update this evening when I get home :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5513: Spark 3.3 - Support bucket in FunctionCatalog

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5513:
URL: https://github.com/apache/iceberg/pull/5513#discussion_r946062793


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/functions/BucketFunction.java:
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.functions;
+
+import java.nio.ByteBuffer;
+import java.util.Set;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.util.BucketUtil;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.functions.BoundFunction;
+import org.apache.spark.sql.connector.catalog.functions.ScalarFunction;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
+import org.apache.spark.sql.types.BinaryType;
+import org.apache.spark.sql.types.ByteType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.DateType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.sql.types.LongType;
+import org.apache.spark.sql.types.ShortType;
+import org.apache.spark.sql.types.StringType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.TimestampType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A Spark function implementation for the Iceberg bucket transform.
+ *
+ * <p>Example usage: {@code SELECT system.bucket(128, 'abc')}, which returns the bucket 122.
+ *
+ * <p>Note that for performance reasons, the given input number of buckets is not validated in the
+ * implementations used in code-gen. The number of buckets must be positive to give meaningful
+ * results.
+ */
+public class BucketFunction implements UnboundFunction {
+  private static final int NUM_BUCKETS_ORDINAL = 0;
+  private static final int VALUE_ORDINAL = 1;
+  private static final Set<DataType> SUPPORTED_NUM_BUCKETS_TYPES =
+      ImmutableSet.of(DataTypes.ByteType, DataTypes.ShortType, DataTypes.IntegerType);
+
+  @Override
+  public BoundFunction bind(StructType inputType) {
+    if (inputType.size() != 2) {
+      throw new UnsupportedOperationException(
+          "Wrong number of inputs (expected numBuckets and value)");
+    }
+
+    StructField numBucketsField = inputType.fields()[NUM_BUCKETS_ORDINAL];
+    StructField valueField = inputType.fields()[VALUE_ORDINAL];
+
+    if (!SUPPORTED_NUM_BUCKETS_TYPES.contains(numBucketsField.dataType())) {
+      throw new UnsupportedOperationException(
+          "Expected number of buckets to be tinyint, shortint or int");
+    }
+
+    // TODO - Support UUID type.
+    DataType type = valueField.dataType();
+    if (type instanceof DateType) {
+      return new BucketInt(DataTypes.DateType);
+    } else if (type instanceof ByteType
+        || type instanceof ShortType
+        || type instanceof IntegerType) {
+      return new BucketInt(DataTypes.IntegerType);
+    } else if (type instanceof LongType) {
+      return new BucketLong(DataTypes.LongType);
+    } else if (type instanceof TimestampType) {
+      return new BucketLong(DataTypes.TimestampType);
+    } else if (type instanceof DecimalType) {
+      return new BucketDecimal(((DecimalType) type).precision(), ((DecimalType) type).scale());
+    } else if (type instanceof StringType) {
+      return new BucketString();
+    } else if (type instanceof BinaryType) {
+      return new BucketBinary();
+    } else {
+      throw new UnsupportedOperationException(
+          "Expected bucketed column to be date, tinyint, smallint, int, bigint, decimal, timestamp, string, or binary");
+    }
+  }
+
+  @Override
+  public String description() {
+    return name()
+        + "(numBuckets, col) - Call Iceberg's bucket transform\n"
+        + "  numBuckets :: number of buckets to divide the rows into, e.g. bucket(100, 34) -> 79 (must be a tinyint, smallint, or int)\n"
+        + "  col :: column to bucket (must be a date, integer, long, timestamp, decimal, string, or binary)";
+  }
+
+  @Override
+  public String name() {
+    return "bucket";
+  }
+
+  public abstract static class BucketBase implements ScalarFunction<Integer> {
+    @Override
+    public String name() {
+      return "bucket";
+    }
+
+    @Override
+    public DataType resultType() {
+      return DataTypes.IntegerType;
+    }
+  }
+
+  // Used for both int and date - tinyint and smallint are upcasted to int by Spark.
+  public static class BucketInt extends BucketBase {
+    private final DataType sqlType;
+
+    // magic method used in codegen
+    public static int invoke(int numBuckets, int value) {
+      return (BucketUtil.hashInteger(value) & Integer.MAX_VALUE) % numBuckets;
+    }
+
+    public BucketInt(DataType sqlType) {
+      this.sqlType = sqlType;
+    }
+
+    @Override
+    public DataType[] inputTypes() {
+      return new DataType[] {DataTypes.IntegerType, sqlType};
+    }
+
+    @Override
+    public String canonicalName() {
+      return String.format("iceberg.bucket(%s)", sqlType.catalogString());
+    }
+
+    @Override
+    public Integer produceResult(InternalRow input) {
+      // return null for null input to match what Spark does in the code-generated versions.
+      return input.isNullAt(NUM_BUCKETS_ORDINAL) || input.isNullAt(VALUE_ORDINAL)
+          ? null
+          : invoke(input.getInt(NUM_BUCKETS_ORDINAL), input.getInt(VALUE_ORDINAL));
+    }
+  }
+
+  // Used for both BigInt and Timestamp
+  public static class BucketLong extends BucketBase {
+    private final DataType sqlType;
+
+    // magic function for usage with codegen - needs to be static
+    public static int invoke(int numBuckets, long value) {
+      return (BucketUtil.hashLong(value) & Integer.MAX_VALUE) % numBuckets;
+    }
+
+    public BucketLong(DataType sqlType) {
+      this.sqlType = sqlType;
+    }
+
+    @Override
+    public DataType[] inputTypes() {
+      return new DataType[] {DataTypes.IntegerType, sqlType};
+    }
+
+    @Override
+    public String canonicalName() {
+      return String.format("iceberg.bucket(%s)", sqlType.catalogString());
+    }
+
+    @Override
+    public Integer produceResult(InternalRow input) {
+      return input.isNullAt(NUM_BUCKETS_ORDINAL) || input.isNullAt(VALUE_ORDINAL)
+          ? null
+          : invoke(input.getInt(NUM_BUCKETS_ORDINAL), input.getLong(VALUE_ORDINAL));
+    }
+  }
+
+  // bucketing by Float is not allowed by the spec, but this has the float hash implementation
+  public static class BucketFloat extends BucketBase {
+
+    public static int invoke(int numBuckets, float value) {
+      return (BucketUtil.hashFloat(value) & Integer.MAX_VALUE) % numBuckets;
+    }
+
+    @Override
+    public DataType[] inputTypes() {
+      return new DataType[] {DataTypes.IntegerType, DataTypes.FloatType};
+    }
+
+    @Override
+    public String canonicalName() {
+      return "iceberg.bucket(float)";
+    }
+
+    @Override
+    public Integer produceResult(InternalRow input) {
+      return input.isNullAt(NUM_BUCKETS_ORDINAL) || input.isNullAt(VALUE_ORDINAL)
+          ? null
+          : invoke(input.getInt(NUM_BUCKETS_ORDINAL), input.getFloat(VALUE_ORDINAL));
+    }
+  }
+
+  public static class BucketString extends BucketBase {
+    // magic function for usage with codegen
+    public static Integer invoke(int numBuckets, UTF8String value) {
+      if (value == null) {
+        return null;
+      }
+
+      return (BucketUtil.hashCharSequence(value.toString()) & Integer.MAX_VALUE) % numBuckets;
+    }
+
+    @Override
+    public DataType[] inputTypes() {
+      return new DataType[] {DataTypes.IntegerType, DataTypes.StringType};
+    }
+
+    @Override
+    public String canonicalName() {
+      return "iceberg.bucket(string)";
+    }
+
+    @Override
+    public Integer produceResult(InternalRow input) {
+      return input.isNullAt(NUM_BUCKETS_ORDINAL) || input.isNullAt(VALUE_ORDINAL)
+          ? null
+          : invoke(input.getInt(NUM_BUCKETS_ORDINAL), input.getUTF8String(VALUE_ORDINAL));

Review Comment:
   You're right. We should keep it to avoid calling the getters.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a diff in pull request #5513: Spark 3.3 - Support bucket in FunctionCatalog

Posted by GitBox <gi...@apache.org>.
kbendick commented on code in PR #5513:
URL: https://github.com/apache/iceberg/pull/5513#discussion_r944764313


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/functions/BucketFunction.java:
##########
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.functions;
+
+import java.nio.ByteBuffer;
+import java.util.Set;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.util.BucketHashUtil;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.functions.BoundFunction;
+import org.apache.spark.sql.connector.catalog.functions.ScalarFunction;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
+import org.apache.spark.sql.types.BinaryType;
+import org.apache.spark.sql.types.ByteType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.DateType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.sql.types.LongType;
+import org.apache.spark.sql.types.ShortType;
+import org.apache.spark.sql.types.StringType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.TimestampType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A Spark function implementation for the Iceberg bucket transform.
+ *
+ * <p>Example usage: {@code SELECT system.bucket(1, 'abc')}, which returns the bucket.
+ *
+ * <p>Note that for performance reasons, the given input number of buckets is not validated in the
+ * implementations used in code-gen. The number of buckets must be positive to give meaningful
+ * results.
+ */
+public class BucketFunction implements UnboundFunction {
+  private static final int NUM_BUCKETS_ORDINAL = 0;
+  private static final int VALUE_ORDINAL = 1;
+  private static final Set<DataType> SUPPORTED_NUM_BUCKETS_TYPES =
+      ImmutableSet.of(DataTypes.ByteType, DataTypes.ShortType, DataTypes.IntegerType);
+
+  @Override
+  public BoundFunction bind(StructType inputType) {
+    if (inputType.size() != 2) {
+      throw new UnsupportedOperationException(
+          "Wrong number of inputs (expected numBuckets and value)");
+    }
+
+    StructField numBucketsField = inputType.fields()[NUM_BUCKETS_ORDINAL];
+    StructField valueField = inputType.fields()[VALUE_ORDINAL];
+
+    if (!SUPPORTED_NUM_BUCKETS_TYPES.contains(numBucketsField.dataType())) {
+      throw new UnsupportedOperationException(
+          "Expected number of buckets to be tinyint, shortint or int");
+    }
+
+    DataType type = valueField.dataType();
+    if (type instanceof DateType) {
+      return new BucketInt(DataTypes.DateType);
+    } else if (type instanceof ByteType
+        || type instanceof ShortType
+        || type instanceof IntegerType) {
+      return new BucketInt(DataTypes.IntegerType);
+    } else if (type instanceof LongType) {
+      return new BucketLong(DataTypes.LongType);
+    } else if (type instanceof TimestampType) {
+      return new BucketLong(DataTypes.TimestampType);
+    } else if (type instanceof DecimalType) {
+      return new BucketDecimal(((DecimalType) type).precision(), ((DecimalType) type).scale());
+    } else if (type instanceof StringType) {
+      return new BucketString();
+    } else if (type instanceof BinaryType) {
+      return new BucketBinary();
+    } else {
+      throw new UnsupportedOperationException(
+          "Expected bucketed column to be date, tinyint, smallint, int, bigint, decimal, timestamp, string, or binary");
+    }
+  }
+
+  @Override
+  public String description() {
+    return name()
+        + "(numBuckets, col) - Call Iceberg's bucket transform\n"
+        + "  numBuckets :: number of buckets to divide the rows into, e.g. bucket(100, 34) -> 79 (must be a tinyint, smallint, or int)\n"
+        + "  col :: column to bucket (must be a date, integer, long, timestamp, decimal, string, or binary)";
+  }
+
+  @Override
+  public String name() {
+    return "bucket";
+  }
+
+  public abstract static class BucketBase implements ScalarFunction<Integer> {
+    @Override
+    public String name() {
+      return "bucket";
+    }
+
+    @Override
+    public DataType resultType() {
+      return DataTypes.IntegerType;
+    }
+  }
+
+  // Used for both int and date - tinyint and smallint are upcasted to int by Spark.
+  public static class BucketInt extends BucketBase {
+    private final DataType sqlType;
+
+    // magic method used in codegen
+    public static int invoke(int numBuckets, int value) {
+      return (BucketHashUtil.forInteger(value) & Integer.MAX_VALUE) % numBuckets;
+    }
+
+    public BucketInt(DataType sqlType) {
+      this.sqlType = sqlType;
+    }
+
+    @Override
+    public Integer produceResult(InternalRow input) {
+      // return null for null input to match what Spark does in the code-generated versions.
+      return input.isNullAt(NUM_BUCKETS_ORDINAL) || input.isNullAt(VALUE_ORDINAL)
+          ? null
+          : invoke(input.getInt(NUM_BUCKETS_ORDINAL), input.getInt(VALUE_ORDINAL));
+    }
+
+    @Override
+    public DataType[] inputTypes() {
+      return new DataType[] {DataTypes.IntegerType, sqlType};
+    }
+
+    @Override
+    public String canonicalName() {
+      return String.format("iceberg.bucket(%s)", sqlType.catalogString());
+    }
+  }
+
+  // Used for both BigInt and Timestamp
+  public static class BucketLong extends BucketBase {
+    private final DataType sqlType;
+
+    // magic function for usage with codegen - needs to be static
+    public static int invoke(int numBuckets, long value) {
+      return (BucketHashUtil.forLong(value) & Integer.MAX_VALUE) % numBuckets;
+    }
+
+    public BucketLong(DataType sqlType) {
+      this.sqlType = sqlType;
+    }
+
+    @Override
+    public DataType[] inputTypes() {
+      return new DataType[] {DataTypes.IntegerType, sqlType};
+    }
+
+    @Override
+    public Integer produceResult(InternalRow input) {
+      // return null for null input to match what Spark does in the code-generated versions.
+      return input.isNullAt(NUM_BUCKETS_ORDINAL) || input.isNullAt(VALUE_ORDINAL)
+          ? null
+          : invoke(input.getInt(NUM_BUCKETS_ORDINAL), input.getLong(VALUE_ORDINAL));
+    }
+
+    @Override
+    public String canonicalName() {
+      return String.format("iceberg.bucket(%s)", sqlType.catalogString());
+    }
+  }
+
+  // bucketing by Float is not allowed by the spec, but this has the float hash implementation
+  public static class BucketFloat extends BucketBase {

Review Comment:
   We have `Bucket` implementations for some types which are not allowed by the spec (including `BucketFloat`).
   
   For simplicity, I chose not to implement those in this PR.
   
   If we want to allow people to use that bucketing function from SQL (even though it's not allowed as a transform in the spec), we can implement them.
   
   Otherwise I'll probably remove this class instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #5513: Spark 3.3 - Support bucket in FunctionCatalog

Posted by GitBox <gi...@apache.org>.
rdblue commented on PR #5513:
URL: https://github.com/apache/iceberg/pull/5513#issuecomment-1219714547

   @kbendick, I think I was waiting for tests to pass. Looks good now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5513: Spark 3.3 - Support bucket in FunctionCatalog

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5513:
URL: https://github.com/apache/iceberg/pull/5513#discussion_r945206146


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/functions/BucketFunction.java:
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.functions;
+
+import java.nio.ByteBuffer;
+import java.util.Set;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.util.BucketUtil;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.functions.BoundFunction;
+import org.apache.spark.sql.connector.catalog.functions.ScalarFunction;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
+import org.apache.spark.sql.types.BinaryType;
+import org.apache.spark.sql.types.ByteType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.DateType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.sql.types.LongType;
+import org.apache.spark.sql.types.ShortType;
+import org.apache.spark.sql.types.StringType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.TimestampType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A Spark function implementation for the Iceberg bucket transform.
+ *
+ * <p>Example usage: {@code SELECT system.bucket(128, 'abc')}, which returns the bucket 122.
+ *
+ * <p>Note that for performance reasons, the given input number of buckets is not validated in the
+ * implementations used in code-gen. The number of buckets must be positive to give meaningful
+ * results.
+ */
+public class BucketFunction implements UnboundFunction {
+  private static final int NUM_BUCKETS_ORDINAL = 0;
+  private static final int VALUE_ORDINAL = 1;
+  private static final Set<DataType> SUPPORTED_NUM_BUCKETS_TYPES =
+      ImmutableSet.of(DataTypes.ByteType, DataTypes.ShortType, DataTypes.IntegerType);
+
+  @Override
+  public BoundFunction bind(StructType inputType) {
+    if (inputType.size() != 2) {
+      throw new UnsupportedOperationException(
+          "Wrong number of inputs (expected numBuckets and value)");
+    }
+
+    StructField numBucketsField = inputType.fields()[NUM_BUCKETS_ORDINAL];
+    StructField valueField = inputType.fields()[VALUE_ORDINAL];
+
+    if (!SUPPORTED_NUM_BUCKETS_TYPES.contains(numBucketsField.dataType())) {
+      throw new UnsupportedOperationException(
+          "Expected number of buckets to be tinyint, shortint or int");
+    }
+
+    // TODO - Support UUID type.
+    DataType type = valueField.dataType();
+    if (type instanceof DateType) {
+      return new BucketInt(DataTypes.DateType);
+    } else if (type instanceof ByteType
+        || type instanceof ShortType
+        || type instanceof IntegerType) {
+      return new BucketInt(DataTypes.IntegerType);
+    } else if (type instanceof LongType) {
+      return new BucketLong(DataTypes.LongType);
+    } else if (type instanceof TimestampType) {
+      return new BucketLong(DataTypes.TimestampType);
+    } else if (type instanceof DecimalType) {
+      return new BucketDecimal(((DecimalType) type).precision(), ((DecimalType) type).scale());
+    } else if (type instanceof StringType) {
+      return new BucketString();
+    } else if (type instanceof BinaryType) {
+      return new BucketBinary();
+    } else {
+      throw new UnsupportedOperationException(
+          "Expected bucketed column to be date, tinyint, smallint, int, bigint, decimal, timestamp, string, or binary");
+    }
+  }
+
+  @Override
+  public String description() {
+    return name()
+        + "(numBuckets, col) - Call Iceberg's bucket transform\n"
+        + "  numBuckets :: number of buckets to divide the rows into, e.g. bucket(100, 34) -> 79 (must be a tinyint, smallint, or int)\n"
+        + "  col :: column to bucket (must be a date, integer, long, timestamp, decimal, string, or binary)";
+  }
+
+  @Override
+  public String name() {
+    return "bucket";
+  }
+
+  public abstract static class BucketBase implements ScalarFunction<Integer> {
+    @Override
+    public String name() {
+      return "bucket";
+    }
+
+    @Override
+    public DataType resultType() {
+      return DataTypes.IntegerType;
+    }
+  }
+
+  // Used for both int and date - tinyint and smallint are upcasted to int by Spark.
+  public static class BucketInt extends BucketBase {
+    private final DataType sqlType;
+
+    // magic method used in codegen
+    public static int invoke(int numBuckets, int value) {
+      return (BucketUtil.hashInteger(value) & Integer.MAX_VALUE) % numBuckets;
+    }
+
+    public BucketInt(DataType sqlType) {
+      this.sqlType = sqlType;
+    }
+
+    @Override
+    public DataType[] inputTypes() {
+      return new DataType[] {DataTypes.IntegerType, sqlType};
+    }
+
+    @Override
+    public String canonicalName() {
+      return String.format("iceberg.bucket(%s)", sqlType.catalogString());
+    }
+
+    @Override
+    public Integer produceResult(InternalRow input) {
+      // return null for null input to match what Spark does in the code-generated versions.
+      return input.isNullAt(NUM_BUCKETS_ORDINAL) || input.isNullAt(VALUE_ORDINAL)
+          ? null
+          : invoke(input.getInt(NUM_BUCKETS_ORDINAL), input.getInt(VALUE_ORDINAL));
+    }
+  }
+
+  // Used for both BigInt and Timestamp
+  public static class BucketLong extends BucketBase {
+    private final DataType sqlType;
+
+    // magic function for usage with codegen - needs to be static
+    public static int invoke(int numBuckets, long value) {
+      return (BucketUtil.hashLong(value) & Integer.MAX_VALUE) % numBuckets;
+    }
+
+    public BucketLong(DataType sqlType) {
+      this.sqlType = sqlType;
+    }
+
+    @Override
+    public DataType[] inputTypes() {
+      return new DataType[] {DataTypes.IntegerType, sqlType};
+    }
+
+    @Override
+    public String canonicalName() {
+      return String.format("iceberg.bucket(%s)", sqlType.catalogString());
+    }
+
+    @Override
+    public Integer produceResult(InternalRow input) {
+      return input.isNullAt(NUM_BUCKETS_ORDINAL) || input.isNullAt(VALUE_ORDINAL)
+          ? null
+          : invoke(input.getInt(NUM_BUCKETS_ORDINAL), input.getLong(VALUE_ORDINAL));
+    }
+  }
+
+  // bucketing by Float is not allowed by the spec, but this has the float hash implementation
+  public static class BucketFloat extends BucketBase {
+
+    public static int invoke(int numBuckets, float value) {
+      return (BucketUtil.hashFloat(value) & Integer.MAX_VALUE) % numBuckets;
+    }
+
+    @Override
+    public DataType[] inputTypes() {
+      return new DataType[] {DataTypes.IntegerType, DataTypes.FloatType};
+    }
+
+    @Override
+    public String canonicalName() {
+      return "iceberg.bucket(float)";
+    }
+
+    @Override
+    public Integer produceResult(InternalRow input) {
+      return input.isNullAt(NUM_BUCKETS_ORDINAL) || input.isNullAt(VALUE_ORDINAL)
+          ? null
+          : invoke(input.getInt(NUM_BUCKETS_ORDINAL), input.getFloat(VALUE_ORDINAL));
+    }
+  }
+
+  public static class BucketString extends BucketBase {
+    // magic function for usage with codegen
+    public static Integer invoke(int numBuckets, UTF8String value) {
+      if (value == null) {
+        return null;
+      }
+
+      return (BucketUtil.hashCharSequence(value.toString()) & Integer.MAX_VALUE) % numBuckets;
+    }
+
+    @Override
+    public DataType[] inputTypes() {
+      return new DataType[] {DataTypes.IntegerType, DataTypes.StringType};
+    }
+
+    @Override
+    public String canonicalName() {
+      return "iceberg.bucket(string)";
+    }
+
+    @Override
+    public Integer produceResult(InternalRow input) {
+      return input.isNullAt(NUM_BUCKETS_ORDINAL) || input.isNullAt(VALUE_ORDINAL)
+          ? null
+          : invoke(input.getInt(NUM_BUCKETS_ORDINAL), input.getUTF8String(VALUE_ORDINAL));
+    }
+  }
+
+  public static class BucketBinary extends BucketBase {
+    public static Integer invoke(int numBuckets, byte[] value) {
+      if (value == null) {
+        return null;
+      }
+
+      return (BucketUtil.hashByteBuffer(ByteBuffer.wrap(value)) & Integer.MAX_VALUE) % numBuckets;
+    }
+
+    @Override
+    public DataType[] inputTypes() {
+      return new DataType[] {DataTypes.IntegerType, DataTypes.BinaryType};
+    }
+
+    @Override
+    public Integer produceResult(InternalRow input) {
+      return input.isNullAt(NUM_BUCKETS_ORDINAL) || input.isNullAt(VALUE_ORDINAL)
+          ? null
+          : invoke(input.getInt(NUM_BUCKETS_ORDINAL), input.getBinary(VALUE_ORDINAL));
+    }
+
+    @Override
+    public String canonicalName() {
+      return "iceberg.bucket(binary)";
+    }
+  }
+
+  public static class BucketDecimal extends BucketBase {
+    private final int precision;
+    private final int scale;
+
+    // magic method used in codegen
+    public static Integer invoke(int numBuckets, Decimal value) {
+      if (value == null) {
+        return null;
+      }
+
+      return (BucketUtil.hashDecimal(value.toJavaBigDecimal()) & Integer.MAX_VALUE) % numBuckets;
+    }
+
+    public BucketDecimal(int precision, int scale) {

Review Comment:
   The hash function is actually independent of precision and scale. It looks like the only reason to pass precision and scale (other than to preserve the input type) is for the canonical name. I think instead this can pass in the Spark type and pass that back through `inputTypes`. The canonical name should be `iceberg.bucket(decimal)`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a diff in pull request #5513: Spark 3.3 - Support bucket in FunctionCatalog

Posted by GitBox <gi...@apache.org>.
kbendick commented on code in PR #5513:
URL: https://github.com/apache/iceberg/pull/5513#discussion_r945207790


##########
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkBucketFunction.java:
##########
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.sql;
+
+import java.nio.charset.StandardCharsets;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.relocated.com.google.common.io.BaseEncoding;
+import org.apache.iceberg.spark.SparkTestBaseWithCatalog;
+import org.apache.spark.sql.AnalysisException;
+import org.assertj.core.api.Assertions;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestSparkBucketFunction extends SparkTestBaseWithCatalog {
+  public TestSparkBucketFunction() {}
+
+  @Before
+  public void useCatalog() {
+    sql("USE %s", catalogName);
+  }
+
+  @Test
+  public void testBucketIntegers() {
+    Assert.assertEquals(
+        "Byte type should bucket similarly to integer",
+        3,
+        scalarSql("SELECT system.bucket(10, 8Y)"));
+    Assert.assertEquals(
+        "Short type should bucket similarly to integer",
+        3,
+        scalarSql("SELECT system.bucket(10, 8S)"));
+    // Integers
+    Assert.assertEquals(3, scalarSql("SELECT system.bucket(10, 8)"));
+    Assert.assertEquals(79, scalarSql("SELECT system.bucket(100, 34)"));
+    Assert.assertNull(scalarSql("SELECT system.bucket(1, CAST(null AS INT))"));
+  }
+
+  @Test
+  public void testBucketDates() {
+    Assert.assertEquals(3, scalarSql("SELECT system.bucket(10, date('1970-01-09'))"));
+    Assert.assertEquals(79, scalarSql("SELECT system.bucket(100, date('1970-02-04'))"));
+    Assert.assertNull(scalarSql("SELECT system.bucket(1, CAST(null AS DATE))"));
+  }
+
+  @Test
+  public void testBucketLong() {
+    Assert.assertEquals(79, scalarSql("SELECT system.bucket(100, 34L)"));
+    Assert.assertEquals(76, scalarSql("SELECT system.bucket(100, 0L)"));
+    Assert.assertEquals(97, scalarSql("SELECT system.bucket(100, -34L)"));
+    Assert.assertEquals(0, scalarSql("SELECT system.bucket(2, -1L)"));
+    Assert.assertNull(scalarSql("SELECT system.bucket(2, CAST(null AS LONG))"));
+  }
+
+  @Test
+  public void testBucketDecimal() {
+    Assert.assertEquals(56, scalarSql("SELECT system.bucket(64, CAST('12.34' as DECIMAL(9, 2)))"));
+    Assert.assertEquals(13, scalarSql("SELECT system.bucket(18, CAST('12.30' as DECIMAL(9, 2)))"));
+    Assert.assertEquals(2, scalarSql("SELECT system.bucket(16, CAST('12.999' as DECIMAL(9, 3)))"));
+    Assert.assertEquals(21, scalarSql("SELECT system.bucket(32, CAST('0.05' as DECIMAL(5, 2)))"));
+    Assert.assertEquals(85, scalarSql("SELECT system.bucket(128, CAST('0.05' as DECIMAL(9, 2)))"));
+    Assert.assertEquals(3, scalarSql("SELECT system.bucket(18, CAST('0.05' as DECIMAL(9, 2)))"));
+
+    Assert.assertNull(
+        "Null input should return null",
+        scalarSql("SELECT system.bucket(2, CAST(null AS decimal))"));
+  }
+
+  @Test
+  public void testBucketTimestamp() {
+    Assert.assertEquals(
+        99, scalarSql("SELECT system.bucket(100, TIMESTAMP '1997-01-01 00:00:00 UTC+00:00')"));
+    Assert.assertEquals(
+        85, scalarSql("SELECT system.bucket(100, TIMESTAMP '1997-01-31 09:26:56 UTC+00:00')"));
+    Assert.assertEquals(
+        62, scalarSql("SELECT system.bucket(100, TIMESTAMP '2022-08-08 00:00:00 UTC+00:00')"));
+    Assert.assertNull(scalarSql("SELECT system.bucket(2, CAST(null AS timestamp))"));
+  }
+
+  @Test
+  public void testBucketString() {
+    Assert.assertEquals(4, scalarSql("SELECT system.bucket(5, 'abcdefg')"));
+    Assert.assertEquals(122, scalarSql("SELECT system.bucket(128, 'abc')"));
+    Assert.assertEquals(54, scalarSql("SELECT system.bucket(64, 'abcde')"));
+    Assert.assertEquals(8, scalarSql("SELECT system.bucket(12, '测试')"));
+    Assert.assertEquals(1, scalarSql("SELECT system.bucket(16, '测试raul试测')"));
+    Assert.assertEquals(
+        "Varchar should work like string",
+        1,
+        scalarSql("SELECT system.bucket(16, CAST('测试raul试测' AS varchar(8)))"));
+    Assert.assertEquals(
+        "Char should work like string",
+        1,
+        scalarSql("SELECT system.bucket(16, CAST('测试raul试测' AS char(8)))"));
+    Assert.assertEquals(
+        "Should not fail on the empty string", 0, scalarSql("SELECT system.bucket(16, '')"));
+    Assert.assertNull(
+        "Null input should return null as output",
+        scalarSql("SELECT system.bucket(16, CAST(null AS string))"));
+  }
+
+  @Test
+  public void testBucketBinary() {
+    Assert.assertEquals(
+        1, scalarSql("SELECT system.bucket(10, X'0102030405060708090a0b0c0d0e0f')"));
+    Assert.assertEquals(10, scalarSql("SELECT system.bucket(12, %s)", asBytesLiteral("abcdefg")));
+    Assert.assertEquals(13, scalarSql("SELECT system.bucket(18, %s)", asBytesLiteral("abc\0\0")));
+    Assert.assertEquals(42, scalarSql("SELECT system.bucket(48, %s)", asBytesLiteral("abc")));
+    Assert.assertEquals(3, scalarSql("SELECT system.bucket(16, %s)", asBytesLiteral("测试_")));

Review Comment:
   Theres not that many test cases in the Bucket transform iirc.
   
   I'll be sure to grab all of the test cases that are in the bucket transform to ensure consistency, in addition to checking the transform result instead of just the final bucketed result.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a diff in pull request #5513: Spark 3.3 - Support bucket in FunctionCatalog

Posted by GitBox <gi...@apache.org>.
kbendick commented on code in PR #5513:
URL: https://github.com/apache/iceberg/pull/5513#discussion_r944719112


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/functions/BucketFunction.java:
##########
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.functions;
+
+import java.nio.ByteBuffer;
+import java.util.Set;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.util.BucketHashUtil;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.functions.BoundFunction;
+import org.apache.spark.sql.connector.catalog.functions.ScalarFunction;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
+import org.apache.spark.sql.types.BinaryType;
+import org.apache.spark.sql.types.ByteType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.DateType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.sql.types.LongType;
+import org.apache.spark.sql.types.ShortType;
+import org.apache.spark.sql.types.StringType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.TimestampType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A Spark function implementation for the Iceberg bucket transform.
+ *
+ * <p>Example usage: {@code SELECT system.bucket(1, 'abc')}, which returns the bucket.
+ *
+ * <p>Note that for performance reasons, the given input number of buckets is not validated in the
+ * implementations used in code-gen. The number of buckets must be positive to give meaningful
+ * results.
+ */
+public class BucketFunction implements UnboundFunction {
+  private static final int NUM_BUCKETS_ORDINAL = 0;
+  private static final int VALUE_ORDINAL = 1;
+  private static final Set<DataType> SUPPORTED_NUM_BUCKETS_TYPES =
+      ImmutableSet.of(DataTypes.ByteType, DataTypes.ShortType, DataTypes.IntegerType);
+
+  @Override
+  public BoundFunction bind(StructType inputType) {
+    if (inputType.size() != 2) {
+      throw new UnsupportedOperationException(
+          "Wrong number of inputs (expected numBuckets and value)");
+    }
+
+    StructField numBucketsField = inputType.fields()[NUM_BUCKETS_ORDINAL];
+    StructField valueField = inputType.fields()[VALUE_ORDINAL];
+
+    if (!SUPPORTED_NUM_BUCKETS_TYPES.contains(numBucketsField.dataType())) {
+      throw new UnsupportedOperationException(
+          "Expected number of buckets to be tinyint, shortint or int");
+    }
+
+    DataType type = valueField.dataType();
+    if (type instanceof DateType) {
+      return new BucketInt(DataTypes.DateType);
+    } else if (type instanceof ByteType
+        || type instanceof ShortType
+        || type instanceof IntegerType) {
+      return new BucketInt(DataTypes.IntegerType);
+    } else if (type instanceof LongType) {
+      return new BucketLong(DataTypes.LongType);
+    } else if (type instanceof TimestampType) {
+      return new BucketLong(DataTypes.TimestampType);
+    } else if (type instanceof DecimalType) {
+      return new BucketDecimal(((DecimalType) type).precision(), ((DecimalType) type).scale());
+    } else if (type instanceof StringType) {
+      return new BucketString();
+    } else if (type instanceof BinaryType) {
+      return new BucketBinary();
+    } else {
+      throw new UnsupportedOperationException(
+          "Expected bucketed column to be date, tinyint, smallint, int, bigint, decimal, timestamp, string, or binary");
+    }
+  }
+
+  @Override
+  public String description() {
+    return name()
+        + "(numBuckets, col) - Call Iceberg's bucket transform\n"
+        + "  numBuckets :: number of buckets to divide the rows into, e.g. bucket(100, 34) -> 79 (must be a tinyint, smallint, or int)\n"
+        + "  col :: column to bucket (must be a date, integer, long, timestamp, decimal, string, or binary)";
+  }
+
+  @Override
+  public String name() {
+    return "bucket";
+  }
+
+  public abstract static class BucketBase<T> implements ScalarFunction<Integer> {
+    @Override
+    public String name() {
+      return "bucket";
+    }
+
+    @Override
+    public DataType resultType() {
+      return DataTypes.IntegerType;
+    }
+  }
+
+  // Used for both int and date - tinyint and smallint are upcasted to int by Spark.
+  public static class BucketInt extends BucketBase<Integer> {
+    private final DataType sqlType;
+
+    // magic method used in codegen
+    public static int invoke(int numBuckets, int value) {
+      return (BucketHashUtil.forInteger(value) & Integer.MAX_VALUE) % numBuckets;
+    }
+
+    public BucketInt(DataType sqlType) {
+      this.sqlType = sqlType;
+    }
+
+    @Override
+    public Integer produceResult(InternalRow input) {
+      // return null for null input to match what Spark does in the code-generated versions.
+      return input.isNullAt(NUM_BUCKETS_ORDINAL) || input.isNullAt(VALUE_ORDINAL)
+          ? null
+          : invoke(input.getInt(NUM_BUCKETS_ORDINAL), input.getInt(VALUE_ORDINAL));
+    }
+
+    @Override
+    public DataType[] inputTypes() {
+      return new DataType[] {DataTypes.IntegerType, sqlType};
+    }
+
+    @Override
+    public String canonicalName() {
+      return String.format("iceberg.bucket(%s)", sqlType.catalogString());
+    }
+  }
+
+  // Used for both BigInt and Timestamp
+  public static class BucketLong extends BucketBase<Long> {
+    private final DataType sqlType;
+
+    // magic function for usage with codegen - needs to be static
+    public static int invoke(int numBuckets, long value) {
+      return (BucketHashUtil.forLong(value) & Integer.MAX_VALUE) % numBuckets;
+    }
+
+    public BucketLong(DataType sqlType) {
+      this.sqlType = sqlType;
+    }
+
+    @Override
+    public DataType[] inputTypes() {
+      return new DataType[] {DataTypes.IntegerType, sqlType};
+    }
+
+    @Override
+    public Integer produceResult(InternalRow input) {
+      // return null for null input to match what Spark does in the code-generated versions.
+      return input.isNullAt(NUM_BUCKETS_ORDINAL) || input.isNullAt(VALUE_ORDINAL)
+          ? null
+          : invoke(input.getInt(NUM_BUCKETS_ORDINAL), input.getLong(VALUE_ORDINAL));
+    }
+
+    @Override
+    public String canonicalName() {
+      return String.format("iceberg.bucket(%s)", sqlType.catalogString());
+    }
+  }
+
+  // bucketing by Float is not allowed by the spec, but this has the float hash implementation
+  public static class BucketFloat extends BucketBase<Float> {
+
+    public static int invoke(int numBuckets, float value) {
+      return (BucketHashUtil.forFloat(value) & Integer.MAX_VALUE) % numBuckets;
+    }
+
+    @Override
+    public DataType[] inputTypes() {
+      return new DataType[] {DataTypes.IntegerType, DataTypes.FloatType};
+    }
+
+    @Override
+    public String canonicalName() {
+      return "iceberg.bucket(float)";
+    }
+
+    @Override
+    public Integer produceResult(InternalRow input) {
+      // return null for null input to match what Spark does in the code-generated versions.
+      return input.isNullAt(NUM_BUCKETS_ORDINAL) || input.isNullAt(VALUE_ORDINAL)
+          ? null
+          : invoke(input.getInt(NUM_BUCKETS_ORDINAL), input.getFloat(VALUE_ORDINAL));
+    }
+  }
+
+  public static class BucketString extends BucketBase<UTF8String> {
+    // magic function for usage with codegen
+    public static Integer invoke(int numBuckets, UTF8String value) {
+      if (value == null) {
+        return null;
+      }
+
+      return (BucketHashUtil.forCharSequence(value.toString()) & Integer.MAX_VALUE) % numBuckets;
+    }
+
+    @Override
+    public DataType[] inputTypes() {
+      return new DataType[] {DataTypes.IntegerType, DataTypes.StringType};
+    }
+
+    @Override
+    public String canonicalName() {
+      return "iceberg.bucket(string)";
+    }
+
+    @Override
+    public Integer produceResult(InternalRow input) {
+      // return null for null input to match what Spark does in the code-generated versions.
+      return input.isNullAt(NUM_BUCKETS_ORDINAL) || input.isNullAt(VALUE_ORDINAL)
+          ? null
+          : invoke(input.getInt(NUM_BUCKETS_ORDINAL), input.getUTF8String(VALUE_ORDINAL));
+    }
+  }

Review Comment:
   For some reason when `produceResult` was defined in the super class, Spark complained that it was not defined and errored out.
   
   So I've moved all of the logic to each subclass.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5513: Spark 3.3 - Support bucket in FunctionCatalog

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5513:
URL: https://github.com/apache/iceberg/pull/5513#discussion_r945205510


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/functions/BucketFunction.java:
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.functions;
+
+import java.nio.ByteBuffer;
+import java.util.Set;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.util.BucketUtil;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.functions.BoundFunction;
+import org.apache.spark.sql.connector.catalog.functions.ScalarFunction;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
+import org.apache.spark.sql.types.BinaryType;
+import org.apache.spark.sql.types.ByteType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.DateType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.sql.types.LongType;
+import org.apache.spark.sql.types.ShortType;
+import org.apache.spark.sql.types.StringType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.TimestampType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A Spark function implementation for the Iceberg bucket transform.
+ *
+ * <p>Example usage: {@code SELECT system.bucket(128, 'abc')}, which returns the bucket 122.
+ *
+ * <p>Note that for performance reasons, the given input number of buckets is not validated in the
+ * implementations used in code-gen. The number of buckets must be positive to give meaningful
+ * results.
+ */
+public class BucketFunction implements UnboundFunction {
+  private static final int NUM_BUCKETS_ORDINAL = 0;
+  private static final int VALUE_ORDINAL = 1;
+  private static final Set<DataType> SUPPORTED_NUM_BUCKETS_TYPES =
+      ImmutableSet.of(DataTypes.ByteType, DataTypes.ShortType, DataTypes.IntegerType);
+
+  @Override
+  public BoundFunction bind(StructType inputType) {
+    if (inputType.size() != 2) {
+      throw new UnsupportedOperationException(
+          "Wrong number of inputs (expected numBuckets and value)");
+    }
+
+    StructField numBucketsField = inputType.fields()[NUM_BUCKETS_ORDINAL];
+    StructField valueField = inputType.fields()[VALUE_ORDINAL];
+
+    if (!SUPPORTED_NUM_BUCKETS_TYPES.contains(numBucketsField.dataType())) {
+      throw new UnsupportedOperationException(
+          "Expected number of buckets to be tinyint, shortint or int");
+    }
+
+    // TODO - Support UUID type.
+    DataType type = valueField.dataType();
+    if (type instanceof DateType) {
+      return new BucketInt(DataTypes.DateType);
+    } else if (type instanceof ByteType
+        || type instanceof ShortType
+        || type instanceof IntegerType) {
+      return new BucketInt(DataTypes.IntegerType);
+    } else if (type instanceof LongType) {
+      return new BucketLong(DataTypes.LongType);
+    } else if (type instanceof TimestampType) {
+      return new BucketLong(DataTypes.TimestampType);
+    } else if (type instanceof DecimalType) {
+      return new BucketDecimal(((DecimalType) type).precision(), ((DecimalType) type).scale());
+    } else if (type instanceof StringType) {
+      return new BucketString();
+    } else if (type instanceof BinaryType) {
+      return new BucketBinary();
+    } else {
+      throw new UnsupportedOperationException(
+          "Expected bucketed column to be date, tinyint, smallint, int, bigint, decimal, timestamp, string, or binary");
+    }
+  }
+
+  @Override
+  public String description() {
+    return name()
+        + "(numBuckets, col) - Call Iceberg's bucket transform\n"
+        + "  numBuckets :: number of buckets to divide the rows into, e.g. bucket(100, 34) -> 79 (must be a tinyint, smallint, or int)\n"
+        + "  col :: column to bucket (must be a date, integer, long, timestamp, decimal, string, or binary)";
+  }
+
+  @Override
+  public String name() {
+    return "bucket";
+  }
+
+  public abstract static class BucketBase implements ScalarFunction<Integer> {
+    @Override
+    public String name() {
+      return "bucket";
+    }
+
+    @Override
+    public DataType resultType() {
+      return DataTypes.IntegerType;
+    }
+  }
+
+  // Used for both int and date - tinyint and smallint are upcasted to int by Spark.
+  public static class BucketInt extends BucketBase {
+    private final DataType sqlType;
+
+    // magic method used in codegen
+    public static int invoke(int numBuckets, int value) {
+      return (BucketUtil.hashInteger(value) & Integer.MAX_VALUE) % numBuckets;

Review Comment:
   Can you move the mask and mod operation to a static helper method? I don't think that we want to copy that in each implementation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5513: Spark 3.3 - Support bucket in FunctionCatalog

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5513:
URL: https://github.com/apache/iceberg/pull/5513#discussion_r945205924


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/functions/BucketFunction.java:
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.functions;
+
+import java.nio.ByteBuffer;
+import java.util.Set;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.util.BucketUtil;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.functions.BoundFunction;
+import org.apache.spark.sql.connector.catalog.functions.ScalarFunction;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
+import org.apache.spark.sql.types.BinaryType;
+import org.apache.spark.sql.types.ByteType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.DateType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.sql.types.LongType;
+import org.apache.spark.sql.types.ShortType;
+import org.apache.spark.sql.types.StringType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.TimestampType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A Spark function implementation for the Iceberg bucket transform.
+ *
+ * <p>Example usage: {@code SELECT system.bucket(128, 'abc')}, which returns the bucket 122.
+ *
+ * <p>Note that for performance reasons, the given input number of buckets is not validated in the
+ * implementations used in code-gen. The number of buckets must be positive to give meaningful
+ * results.
+ */
+public class BucketFunction implements UnboundFunction {
+  private static final int NUM_BUCKETS_ORDINAL = 0;
+  private static final int VALUE_ORDINAL = 1;
+  private static final Set<DataType> SUPPORTED_NUM_BUCKETS_TYPES =
+      ImmutableSet.of(DataTypes.ByteType, DataTypes.ShortType, DataTypes.IntegerType);
+
+  @Override
+  public BoundFunction bind(StructType inputType) {
+    if (inputType.size() != 2) {
+      throw new UnsupportedOperationException(
+          "Wrong number of inputs (expected numBuckets and value)");
+    }
+
+    StructField numBucketsField = inputType.fields()[NUM_BUCKETS_ORDINAL];
+    StructField valueField = inputType.fields()[VALUE_ORDINAL];
+
+    if (!SUPPORTED_NUM_BUCKETS_TYPES.contains(numBucketsField.dataType())) {
+      throw new UnsupportedOperationException(
+          "Expected number of buckets to be tinyint, shortint or int");
+    }
+
+    // TODO - Support UUID type.
+    DataType type = valueField.dataType();
+    if (type instanceof DateType) {
+      return new BucketInt(DataTypes.DateType);
+    } else if (type instanceof ByteType
+        || type instanceof ShortType
+        || type instanceof IntegerType) {
+      return new BucketInt(DataTypes.IntegerType);
+    } else if (type instanceof LongType) {
+      return new BucketLong(DataTypes.LongType);
+    } else if (type instanceof TimestampType) {
+      return new BucketLong(DataTypes.TimestampType);

Review Comment:
   What about time? I think that's missing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5513: Spark 3.3 - Support bucket in FunctionCatalog

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5513:
URL: https://github.com/apache/iceberg/pull/5513#discussion_r944758537


##########
api/src/main/java/org/apache/iceberg/util/BucketHashUtil.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.util;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.UUID;
+import org.apache.iceberg.relocated.com.google.common.hash.HashFunction;
+import org.apache.iceberg.relocated.com.google.common.hash.Hashing;
+
+/**
+ * Contains the logic for hashing various types for use with the {@code bucket} partition
+ * transformations
+ */
+public class BucketHashUtil {
+
+  private static final HashFunction MURMUR3 = Hashing.murmur3_32_fixed();
+
+  private BucketHashUtil() {}
+
+  public static int forInteger(Integer value) {

Review Comment:
   I would normally expect `for` to signal a method that will return something that I use, rather than doing something directly. Since these are all hash methods, what about naming them `hash`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5513: Spark 3.3 - Support bucket in FunctionCatalog

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5513:
URL: https://github.com/apache/iceberg/pull/5513#discussion_r944758882


##########
api/src/main/java/org/apache/iceberg/util/BucketHashUtil.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.util;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.UUID;
+import org.apache.iceberg.relocated.com.google.common.hash.HashFunction;
+import org.apache.iceberg.relocated.com.google.common.hash.Hashing;
+
+/**
+ * Contains the logic for hashing various types for use with the {@code bucket} partition
+ * transformations
+ */
+public class BucketHashUtil {

Review Comment:
   How about `BucketUtil` instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5513: Spark 3.3 - Support bucket in FunctionCatalog

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5513:
URL: https://github.com/apache/iceberg/pull/5513#discussion_r945206468


##########
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkBucketFunction.java:
##########
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.sql;
+
+import java.nio.charset.StandardCharsets;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.relocated.com.google.common.io.BaseEncoding;
+import org.apache.iceberg.spark.SparkTestBaseWithCatalog;
+import org.apache.spark.sql.AnalysisException;
+import org.assertj.core.api.Assertions;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestSparkBucketFunction extends SparkTestBaseWithCatalog {
+  public TestSparkBucketFunction() {}
+
+  @Before
+  public void useCatalog() {
+    sql("USE %s", catalogName);
+  }
+
+  @Test
+  public void testBucketIntegers() {
+    Assert.assertEquals(
+        "Byte type should bucket similarly to integer",
+        3,
+        scalarSql("SELECT system.bucket(10, 8Y)"));
+    Assert.assertEquals(
+        "Short type should bucket similarly to integer",
+        3,
+        scalarSql("SELECT system.bucket(10, 8S)"));
+    // Integers
+    Assert.assertEquals(3, scalarSql("SELECT system.bucket(10, 8)"));
+    Assert.assertEquals(79, scalarSql("SELECT system.bucket(100, 34)"));
+    Assert.assertNull(scalarSql("SELECT system.bucket(1, CAST(null AS INT))"));
+  }
+
+  @Test
+  public void testBucketDates() {
+    Assert.assertEquals(3, scalarSql("SELECT system.bucket(10, date('1970-01-09'))"));
+    Assert.assertEquals(79, scalarSql("SELECT system.bucket(100, date('1970-02-04'))"));
+    Assert.assertNull(scalarSql("SELECT system.bucket(1, CAST(null AS DATE))"));
+  }
+
+  @Test
+  public void testBucketLong() {
+    Assert.assertEquals(79, scalarSql("SELECT system.bucket(100, 34L)"));
+    Assert.assertEquals(76, scalarSql("SELECT system.bucket(100, 0L)"));
+    Assert.assertEquals(97, scalarSql("SELECT system.bucket(100, -34L)"));
+    Assert.assertEquals(0, scalarSql("SELECT system.bucket(2, -1L)"));
+    Assert.assertNull(scalarSql("SELECT system.bucket(2, CAST(null AS LONG))"));
+  }
+
+  @Test
+  public void testBucketDecimal() {
+    Assert.assertEquals(56, scalarSql("SELECT system.bucket(64, CAST('12.34' as DECIMAL(9, 2)))"));
+    Assert.assertEquals(13, scalarSql("SELECT system.bucket(18, CAST('12.30' as DECIMAL(9, 2)))"));
+    Assert.assertEquals(2, scalarSql("SELECT system.bucket(16, CAST('12.999' as DECIMAL(9, 3)))"));
+    Assert.assertEquals(21, scalarSql("SELECT system.bucket(32, CAST('0.05' as DECIMAL(5, 2)))"));
+    Assert.assertEquals(85, scalarSql("SELECT system.bucket(128, CAST('0.05' as DECIMAL(9, 2)))"));
+    Assert.assertEquals(3, scalarSql("SELECT system.bucket(18, CAST('0.05' as DECIMAL(9, 2)))"));
+
+    Assert.assertNull(
+        "Null input should return null",
+        scalarSql("SELECT system.bucket(2, CAST(null AS decimal))"));
+  }
+
+  @Test
+  public void testBucketTimestamp() {
+    Assert.assertEquals(
+        99, scalarSql("SELECT system.bucket(100, TIMESTAMP '1997-01-01 00:00:00 UTC+00:00')"));
+    Assert.assertEquals(
+        85, scalarSql("SELECT system.bucket(100, TIMESTAMP '1997-01-31 09:26:56 UTC+00:00')"));
+    Assert.assertEquals(
+        62, scalarSql("SELECT system.bucket(100, TIMESTAMP '2022-08-08 00:00:00 UTC+00:00')"));
+    Assert.assertNull(scalarSql("SELECT system.bucket(2, CAST(null AS timestamp))"));
+  }
+
+  @Test
+  public void testBucketString() {
+    Assert.assertEquals(4, scalarSql("SELECT system.bucket(5, 'abcdefg')"));
+    Assert.assertEquals(122, scalarSql("SELECT system.bucket(128, 'abc')"));
+    Assert.assertEquals(54, scalarSql("SELECT system.bucket(64, 'abcde')"));
+    Assert.assertEquals(8, scalarSql("SELECT system.bucket(12, '测试')"));
+    Assert.assertEquals(1, scalarSql("SELECT system.bucket(16, '测试raul试测')"));
+    Assert.assertEquals(
+        "Varchar should work like string",
+        1,
+        scalarSql("SELECT system.bucket(16, CAST('测试raul试测' AS varchar(8)))"));
+    Assert.assertEquals(
+        "Char should work like string",
+        1,
+        scalarSql("SELECT system.bucket(16, CAST('测试raul试测' AS char(8)))"));
+    Assert.assertEquals(
+        "Should not fail on the empty string", 0, scalarSql("SELECT system.bucket(16, '')"));
+    Assert.assertNull(
+        "Null input should return null as output",
+        scalarSql("SELECT system.bucket(16, CAST(null AS string))"));
+  }
+
+  @Test
+  public void testBucketBinary() {
+    Assert.assertEquals(
+        1, scalarSql("SELECT system.bucket(10, X'0102030405060708090a0b0c0d0e0f')"));
+    Assert.assertEquals(10, scalarSql("SELECT system.bucket(12, %s)", asBytesLiteral("abcdefg")));
+    Assert.assertEquals(13, scalarSql("SELECT system.bucket(18, %s)", asBytesLiteral("abc\0\0")));
+    Assert.assertEquals(42, scalarSql("SELECT system.bucket(48, %s)", asBytesLiteral("abc")));
+    Assert.assertEquals(3, scalarSql("SELECT system.bucket(16, %s)", asBytesLiteral("测试_")));
+
+    Assert.assertNull(
+        "Null input should return null as output",
+        scalarSql("SELECT system.bucket(100, CAST(null AS binary))"));
+  }
+
+  @Test
+  public void testNumBucketsAcceptsShortAndByte() {
+    Assert.assertEquals(
+        "Short types should be usable for the number of buckets field",
+        1,
+        scalarSql("SELECT system.bucket(5S, 1L)"));
+
+    Assert.assertEquals(
+        "Byte types should be allowed for the number of buckets field",
+        1,
+        scalarSql("SELECT system.bucket(5Y, 1)"));
+  }
+
+  @Test
+  public void testWrongNumberOfArguments() {
+    AssertHelpers.assertThrows(
+        "Function resolution should not work with zero arguments",
+        AnalysisException.class,
+        "Function 'bucket' cannot process input: (): Wrong number of inputs (expected numBuckets and value)",
+        () -> scalarSql("SELECT system.bucket()"));
+
+    AssertHelpers.assertThrows(
+        "Function resolution should not work with only one argument",
+        AnalysisException.class,
+        "Function 'bucket' cannot process input: (int): Wrong number of inputs (expected numBuckets and value)",
+        () -> scalarSql("SELECT system.bucket(1)"));
+
+    AssertHelpers.assertThrows(
+        "Function resolution should not work with more than two arguments",
+        AnalysisException.class,
+        "Function 'bucket' cannot process input: (int, bigint, int): Wrong number of inputs (expected numBuckets and value)",
+        () -> scalarSql("SELECT system.bucket(1, 1L, 1)"));
+  }
+
+  @Test
+  public void testInvalidTypesCannotBeUsedForNumberOfBuckets() {
+    AssertHelpers.assertThrows(
+        "Decimal type should not be coercible to the number of buckets",
+        AnalysisException.class,
+        "Function 'bucket' cannot process input: (decimal(9,2), int): Expected number of buckets to be tinyint, shortint or int",
+        () -> scalarSql("SELECT system.bucket(CAST('12.34' as DECIMAL(9, 2)), 10)"));
+
+    AssertHelpers.assertThrows(
+        "Long type should not be coercible to the number of buckets",
+        AnalysisException.class,
+        "Function 'bucket' cannot process input: (bigint, int): Expected number of buckets to be tinyint, shortint or int",
+        () -> scalarSql("SELECT system.bucket(12L, 10)"));
+
+    AssertHelpers.assertThrows(
+        "String type should not be coercible to the number of buckets",
+        AnalysisException.class,
+        "Function 'bucket' cannot process input: (string, int): Expected number of buckets to be tinyint, shortint or int",
+        () -> scalarSql("SELECT system.bucket('5', 10)"));
+
+    AssertHelpers.assertThrows(
+        "Interval year to month  type should not be coercible to the number of buckets",
+        AnalysisException.class,
+        "Function 'bucket' cannot process input: (interval year to month, int): Expected number of buckets to be tinyint, shortint or int",
+        () -> scalarSql("SELECT system.bucket(INTERVAL '100-00' YEAR TO MONTH, 10)"));
+
+    AssertHelpers.assertThrows(
+        "Interval day-time type should not be coercible to the number of buckets",
+        AnalysisException.class,
+        "Function 'bucket' cannot process input: (interval day to second, int): Expected number of buckets to be tinyint, shortint or int",
+        () -> scalarSql("SELECT system.bucket(CAST('11 23:4:0' AS INTERVAL DAY TO SECOND), 10)"));
+  }
+
+  @Test
+  public void testInvalidTypesForBucketColumn() {
+    AssertHelpers.assertThrows(
+        "Double type should not be bucketable",
+        AnalysisException.class,
+        "Function 'bucket' cannot process input: (int, float): Expected bucketed column to be date, tinyint, smallint, int, bigint, decimal, timestamp, string, or binary",
+        () -> scalarSql("SELECT system.bucket(10, cast(12.3456 as float))"));
+
+    AssertHelpers.assertThrows(
+        "Double type should not be bucketable",
+        AnalysisException.class,
+        "Function 'bucket' cannot process input: (int, double): Expected bucketed column to be date, tinyint, smallint, int, bigint, decimal, timestamp, string, or binary",
+        () -> scalarSql("SELECT system.bucket(10, cast(12.3456 as double))"));
+
+    AssertHelpers.assertThrows(
+        "Boolean type should not be bucketable",
+        AnalysisException.class,
+        "Function 'bucket' cannot process input: (int, boolean)",
+        () -> scalarSql("SELECT system.bucket(10, true)"));
+
+    AssertHelpers.assertThrows(
+        "Map types should not be bucketable",
+        AnalysisException.class,
+        "Function 'bucket' cannot process input: (int, map<int,int>)",
+        () -> scalarSql("SELECT system.bucket(10, map(1, 1))"));
+
+    AssertHelpers.assertThrows(
+        "Array types should not be bucketable",
+        AnalysisException.class,
+        "Function 'bucket' cannot process input: (int, array<bigint>)",
+        () -> scalarSql("SELECT system.bucket(10, array(1L))"));
+
+    AssertHelpers.assertThrows(
+        "Interval year-to-month type should not be bucketable",
+        AnalysisException.class,
+        "Function 'bucket' cannot process input: (int, interval year to month)",
+        () -> scalarSql("SELECT system.bucket(10, INTERVAL '100-00' YEAR TO MONTH)"));
+
+    AssertHelpers.assertThrows(
+        "Interval day-time type should not be bucketable",
+        AnalysisException.class,
+        "Function 'bucket' cannot process input: (int, interval day to second)",
+        () -> scalarSql("SELECT system.bucket(10, CAST('11 23:4:0' AS INTERVAL DAY TO SECOND))"));
+  }
+
+  @Test
+  public void testMagicFunctionsResolveForTinyIntAndSmallIntNumberOfBuckets() {
+    // Magic functions have staticinvoke in the explain output.
+    // Nonmagic calls use applyfunctionexpression instead and go through produceResults.
+    String tinyIntWidthExplain = (String) scalarSql("EXPLAIN EXTENDED SELECT system.bucket(1Y, 6)");
+    Assertions.assertThat(tinyIntWidthExplain)
+        .contains("cast(1 as int)")
+        .contains("staticinvoke(class org.apache.iceberg.spark.functions.BucketFunction$BucketInt");

Review Comment:
   I would rather validate that calling this works and gives the expected answer, rather than trying to parse explain output. That output may change, but the function result (and success) should not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a diff in pull request #5513: Spark 3.3 - Support bucket in FunctionCatalog

Posted by GitBox <gi...@apache.org>.
kbendick commented on code in PR #5513:
URL: https://github.com/apache/iceberg/pull/5513#discussion_r945207438


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/functions/BucketFunction.java:
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.functions;
+
+import java.nio.ByteBuffer;
+import java.util.Set;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.util.BucketUtil;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.functions.BoundFunction;
+import org.apache.spark.sql.connector.catalog.functions.ScalarFunction;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
+import org.apache.spark.sql.types.BinaryType;
+import org.apache.spark.sql.types.ByteType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.DateType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.sql.types.LongType;
+import org.apache.spark.sql.types.ShortType;
+import org.apache.spark.sql.types.StringType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.TimestampType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A Spark function implementation for the Iceberg bucket transform.
+ *
+ * <p>Example usage: {@code SELECT system.bucket(128, 'abc')}, which returns the bucket 122.
+ *
+ * <p>Note that for performance reasons, the given input number of buckets is not validated in the
+ * implementations used in code-gen. The number of buckets must be positive to give meaningful
+ * results.
+ */
+public class BucketFunction implements UnboundFunction {
+  private static final int NUM_BUCKETS_ORDINAL = 0;
+  private static final int VALUE_ORDINAL = 1;
+  private static final Set<DataType> SUPPORTED_NUM_BUCKETS_TYPES =
+      ImmutableSet.of(DataTypes.ByteType, DataTypes.ShortType, DataTypes.IntegerType);
+
+  @Override
+  public BoundFunction bind(StructType inputType) {
+    if (inputType.size() != 2) {
+      throw new UnsupportedOperationException(
+          "Wrong number of inputs (expected numBuckets and value)");
+    }
+
+    StructField numBucketsField = inputType.fields()[NUM_BUCKETS_ORDINAL];
+    StructField valueField = inputType.fields()[VALUE_ORDINAL];
+
+    if (!SUPPORTED_NUM_BUCKETS_TYPES.contains(numBucketsField.dataType())) {
+      throw new UnsupportedOperationException(
+          "Expected number of buckets to be tinyint, shortint or int");
+    }
+
+    // TODO - Support UUID type.
+    DataType type = valueField.dataType();
+    if (type instanceof DateType) {
+      return new BucketInt(DataTypes.DateType);
+    } else if (type instanceof ByteType
+        || type instanceof ShortType
+        || type instanceof IntegerType) {
+      return new BucketInt(DataTypes.IntegerType);
+    } else if (type instanceof LongType) {
+      return new BucketLong(DataTypes.LongType);
+    } else if (type instanceof TimestampType) {
+      return new BucketLong(DataTypes.TimestampType);
+    } else if (type instanceof DecimalType) {
+      return new BucketDecimal(((DecimalType) type).precision(), ((DecimalType) type).scale());
+    } else if (type instanceof StringType) {
+      return new BucketString();
+    } else if (type instanceof BinaryType) {
+      return new BucketBinary();
+    } else {
+      throw new UnsupportedOperationException(
+          "Expected bucketed column to be date, tinyint, smallint, int, bigint, decimal, timestamp, string, or binary");
+    }
+  }
+
+  @Override
+  public String description() {
+    return name()
+        + "(numBuckets, col) - Call Iceberg's bucket transform\n"
+        + "  numBuckets :: number of buckets to divide the rows into, e.g. bucket(100, 34) -> 79 (must be a tinyint, smallint, or int)\n"
+        + "  col :: column to bucket (must be a date, integer, long, timestamp, decimal, string, or binary)";
+  }
+
+  @Override
+  public String name() {
+    return "bucket";
+  }
+
+  public abstract static class BucketBase implements ScalarFunction<Integer> {
+    @Override
+    public String name() {
+      return "bucket";
+    }
+
+    @Override
+    public DataType resultType() {
+      return DataTypes.IntegerType;
+    }
+  }
+
+  // Used for both int and date - tinyint and smallint are upcasted to int by Spark.
+  public static class BucketInt extends BucketBase {
+    private final DataType sqlType;
+
+    // magic method used in codegen
+    public static int invoke(int numBuckets, int value) {
+      return (BucketUtil.hashInteger(value) & Integer.MAX_VALUE) % numBuckets;

Review Comment:
   Yeah I believe so. I originally had the subclasses using a templated type in an earlier design, but too many things needed "static" copies due to the inability to use static with templated types.
   
   I encountered some issues in a previous design with Spark not sending the generic `produceResult` function definition to executors when it was in the base class, but I think with a static function that shouldnt happen.
   
   Will try to make it work tonight now that there's no template type.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5513: Spark 3.3 - Support bucket in FunctionCatalog

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5513:
URL: https://github.com/apache/iceberg/pull/5513#discussion_r945205142


##########
api/src/main/java/org/apache/iceberg/util/BucketUtil.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.util;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.UUID;
+import org.apache.iceberg.relocated.com.google.common.hash.HashFunction;
+import org.apache.iceberg.relocated.com.google.common.hash.Hashing;
+
+/**
+ * Contains the logic for hashing various types for use with the {@code bucket} partition
+ * transformations
+ */
+public class BucketUtil {
+
+  private static final HashFunction MURMUR3 = Hashing.murmur3_32_fixed();
+
+  private BucketUtil() {}
+
+  public static int hashInteger(Integer value) {
+    return MURMUR3.hashLong(value.longValue()).asInt();
+  }
+
+  public static int hashLong(Long value) {
+    return MURMUR3.hashLong(value).asInt();
+  }
+
+  public static int hashFloat(Float value) {
+    return MURMUR3.hashLong(Double.doubleToLongBits((double) value)).asInt();
+  }
+
+  public static int hashDouble(Double value) {
+    return MURMUR3.hashLong(Double.doubleToLongBits(value)).asInt();
+  }
+
+  public static int hashCharSequence(CharSequence value) {
+    return MURMUR3.hashString(value, StandardCharsets.UTF_8).asInt();
+  }
+
+  public static int hashByteBuffer(ByteBuffer value) {
+    if (value.hasArray()) {
+      return MURMUR3
+          .hashBytes(
+              value.array(),
+              value.arrayOffset() + value.position(),
+              value.arrayOffset() + value.remaining())

Review Comment:
   I just realized that this isn't correct. It has been wrong for years, evidently.
   
   [`HashFunction.hashBytes`](https://guava.dev/releases/19.0/api/docs/com/google/common/hash/HashFunction.html#hashBytes(byte[],%20int,%20int)) accepts a length. `value.remaining()` is that length.
   
   Looks like this was working because `arrayOffset` was never non-zero. That's because the only way to get a `ByteBuffer` with a non-zero `arrayOffset` (as far as I can tell) is to use `ByteBuffer.slice()`, which creates a copy of the `ByteBuffer` and sets it. Since `slice` doesn't allow setting the `position` and `limit`, everywhere that I've been able to find uses `duplicate()` and then sets `position` and `limit` because there's no need to limit the start or capacity of the `ByteBuffer` when the backing array is not limited.
   
   Here's a test that catches this. @kbendick, can you add this to `TestBucketing` along with the fix here (remove `value.arrayOffset()`)?
   
   ```java
     @Test
     public void testByteBufferOnHeapArrayOffset() {
       byte[] bytes = randomBytes(128);
       ByteBuffer raw = ByteBuffer.wrap(bytes, 5, 100);
       ByteBuffer buffer = raw.slice();
       Assert.assertEquals("Buffer arrayOffset should be 5", 5, buffer.arrayOffset());
   
       Bucket<ByteBuffer> bucketFunc = Bucket.get(Types.BinaryType.get(), 100);
   
       Assert.assertEquals(
           "HeapByteBuffer hash should match hash for correct slice",
           hashBytes(bytes, 5, 100),
           bucketFunc.hash(buffer));
   
       // verify that the buffer was not modified
       Assert.assertEquals("Buffer position should be 0", 0, buffer.position());
       Assert.assertEquals("Buffer limit should not change", 100, buffer.limit());
     }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a diff in pull request #5513: Spark 3.3 - Support bucket in FunctionCatalog

Posted by GitBox <gi...@apache.org>.
kbendick commented on code in PR #5513:
URL: https://github.com/apache/iceberg/pull/5513#discussion_r946033351


##########
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkBucketFunction.java:
##########
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.sql;
+
+import java.nio.charset.StandardCharsets;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.relocated.com.google.common.io.BaseEncoding;
+import org.apache.iceberg.spark.SparkTestBaseWithCatalog;
+import org.apache.spark.sql.AnalysisException;
+import org.assertj.core.api.Assertions;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestSparkBucketFunction extends SparkTestBaseWithCatalog {
+  public TestSparkBucketFunction() {}
+
+  @Before
+  public void useCatalog() {
+    sql("USE %s", catalogName);
+  }
+
+  @Test
+  public void testBucketIntegers() {
+    Assert.assertEquals(
+        "Byte type should bucket similarly to integer",
+        3,
+        scalarSql("SELECT system.bucket(10, 8Y)"));
+    Assert.assertEquals(
+        "Short type should bucket similarly to integer",
+        3,
+        scalarSql("SELECT system.bucket(10, 8S)"));
+    // Integers
+    Assert.assertEquals(3, scalarSql("SELECT system.bucket(10, 8)"));
+    Assert.assertEquals(79, scalarSql("SELECT system.bucket(100, 34)"));
+    Assert.assertNull(scalarSql("SELECT system.bucket(1, CAST(null AS INT))"));
+  }
+
+  @Test
+  public void testBucketDates() {
+    Assert.assertEquals(3, scalarSql("SELECT system.bucket(10, date('1970-01-09'))"));
+    Assert.assertEquals(79, scalarSql("SELECT system.bucket(100, date('1970-02-04'))"));
+    Assert.assertNull(scalarSql("SELECT system.bucket(1, CAST(null AS DATE))"));
+  }
+
+  @Test
+  public void testBucketLong() {
+    Assert.assertEquals(79, scalarSql("SELECT system.bucket(100, 34L)"));
+    Assert.assertEquals(76, scalarSql("SELECT system.bucket(100, 0L)"));
+    Assert.assertEquals(97, scalarSql("SELECT system.bucket(100, -34L)"));
+    Assert.assertEquals(0, scalarSql("SELECT system.bucket(2, -1L)"));
+    Assert.assertNull(scalarSql("SELECT system.bucket(2, CAST(null AS LONG))"));
+  }
+
+  @Test
+  public void testBucketDecimal() {
+    Assert.assertEquals(56, scalarSql("SELECT system.bucket(64, CAST('12.34' as DECIMAL(9, 2)))"));
+    Assert.assertEquals(13, scalarSql("SELECT system.bucket(18, CAST('12.30' as DECIMAL(9, 2)))"));
+    Assert.assertEquals(2, scalarSql("SELECT system.bucket(16, CAST('12.999' as DECIMAL(9, 3)))"));
+    Assert.assertEquals(21, scalarSql("SELECT system.bucket(32, CAST('0.05' as DECIMAL(5, 2)))"));
+    Assert.assertEquals(85, scalarSql("SELECT system.bucket(128, CAST('0.05' as DECIMAL(9, 2)))"));
+    Assert.assertEquals(3, scalarSql("SELECT system.bucket(18, CAST('0.05' as DECIMAL(9, 2)))"));
+
+    Assert.assertNull(
+        "Null input should return null",
+        scalarSql("SELECT system.bucket(2, CAST(null AS decimal))"));
+  }
+
+  @Test
+  public void testBucketTimestamp() {
+    Assert.assertEquals(
+        99, scalarSql("SELECT system.bucket(100, TIMESTAMP '1997-01-01 00:00:00 UTC+00:00')"));
+    Assert.assertEquals(
+        85, scalarSql("SELECT system.bucket(100, TIMESTAMP '1997-01-31 09:26:56 UTC+00:00')"));
+    Assert.assertEquals(
+        62, scalarSql("SELECT system.bucket(100, TIMESTAMP '2022-08-08 00:00:00 UTC+00:00')"));
+    Assert.assertNull(scalarSql("SELECT system.bucket(2, CAST(null AS timestamp))"));
+  }
+
+  @Test
+  public void testBucketString() {
+    Assert.assertEquals(4, scalarSql("SELECT system.bucket(5, 'abcdefg')"));
+    Assert.assertEquals(122, scalarSql("SELECT system.bucket(128, 'abc')"));
+    Assert.assertEquals(54, scalarSql("SELECT system.bucket(64, 'abcde')"));
+    Assert.assertEquals(8, scalarSql("SELECT system.bucket(12, '测试')"));
+    Assert.assertEquals(1, scalarSql("SELECT system.bucket(16, '测试raul试测')"));
+    Assert.assertEquals(
+        "Varchar should work like string",
+        1,
+        scalarSql("SELECT system.bucket(16, CAST('测试raul试测' AS varchar(8)))"));
+    Assert.assertEquals(
+        "Char should work like string",
+        1,
+        scalarSql("SELECT system.bucket(16, CAST('测试raul试测' AS char(8)))"));
+    Assert.assertEquals(
+        "Should not fail on the empty string", 0, scalarSql("SELECT system.bucket(16, '')"));
+    Assert.assertNull(
+        "Null input should return null as output",
+        scalarSql("SELECT system.bucket(16, CAST(null AS string))"));
+  }
+
+  @Test
+  public void testBucketBinary() {
+    Assert.assertEquals(
+        1, scalarSql("SELECT system.bucket(10, X'0102030405060708090a0b0c0d0e0f')"));
+    Assert.assertEquals(10, scalarSql("SELECT system.bucket(12, %s)", asBytesLiteral("abcdefg")));
+    Assert.assertEquals(13, scalarSql("SELECT system.bucket(18, %s)", asBytesLiteral("abc\0\0")));
+    Assert.assertEquals(42, scalarSql("SELECT system.bucket(48, %s)", asBytesLiteral("abc")));
+    Assert.assertEquals(3, scalarSql("SELECT system.bucket(16, %s)", asBytesLiteral("测试_")));
+
+    Assert.assertNull(
+        "Null input should return null as output",
+        scalarSql("SELECT system.bucket(100, CAST(null AS binary))"));
+  }
+
+  @Test
+  public void testNumBucketsAcceptsShortAndByte() {
+    Assert.assertEquals(
+        "Short types should be usable for the number of buckets field",
+        1,
+        scalarSql("SELECT system.bucket(5S, 1L)"));
+
+    Assert.assertEquals(
+        "Byte types should be allowed for the number of buckets field",
+        1,
+        scalarSql("SELECT system.bucket(5Y, 1)"));
+  }
+
+  @Test
+  public void testWrongNumberOfArguments() {
+    AssertHelpers.assertThrows(
+        "Function resolution should not work with zero arguments",
+        AnalysisException.class,
+        "Function 'bucket' cannot process input: (): Wrong number of inputs (expected numBuckets and value)",
+        () -> scalarSql("SELECT system.bucket()"));
+
+    AssertHelpers.assertThrows(
+        "Function resolution should not work with only one argument",
+        AnalysisException.class,
+        "Function 'bucket' cannot process input: (int): Wrong number of inputs (expected numBuckets and value)",
+        () -> scalarSql("SELECT system.bucket(1)"));
+
+    AssertHelpers.assertThrows(
+        "Function resolution should not work with more than two arguments",
+        AnalysisException.class,
+        "Function 'bucket' cannot process input: (int, bigint, int): Wrong number of inputs (expected numBuckets and value)",
+        () -> scalarSql("SELECT system.bucket(1, 1L, 1)"));
+  }
+
+  @Test
+  public void testInvalidTypesCannotBeUsedForNumberOfBuckets() {
+    AssertHelpers.assertThrows(
+        "Decimal type should not be coercible to the number of buckets",
+        AnalysisException.class,
+        "Function 'bucket' cannot process input: (decimal(9,2), int): Expected number of buckets to be tinyint, shortint or int",
+        () -> scalarSql("SELECT system.bucket(CAST('12.34' as DECIMAL(9, 2)), 10)"));
+
+    AssertHelpers.assertThrows(
+        "Long type should not be coercible to the number of buckets",
+        AnalysisException.class,
+        "Function 'bucket' cannot process input: (bigint, int): Expected number of buckets to be tinyint, shortint or int",
+        () -> scalarSql("SELECT system.bucket(12L, 10)"));
+
+    AssertHelpers.assertThrows(
+        "String type should not be coercible to the number of buckets",
+        AnalysisException.class,
+        "Function 'bucket' cannot process input: (string, int): Expected number of buckets to be tinyint, shortint or int",
+        () -> scalarSql("SELECT system.bucket('5', 10)"));
+
+    AssertHelpers.assertThrows(
+        "Interval year to month  type should not be coercible to the number of buckets",
+        AnalysisException.class,
+        "Function 'bucket' cannot process input: (interval year to month, int): Expected number of buckets to be tinyint, shortint or int",
+        () -> scalarSql("SELECT system.bucket(INTERVAL '100-00' YEAR TO MONTH, 10)"));
+
+    AssertHelpers.assertThrows(
+        "Interval day-time type should not be coercible to the number of buckets",
+        AnalysisException.class,
+        "Function 'bucket' cannot process input: (interval day to second, int): Expected number of buckets to be tinyint, shortint or int",
+        () -> scalarSql("SELECT system.bucket(CAST('11 23:4:0' AS INTERVAL DAY TO SECOND), 10)"));
+  }
+
+  @Test
+  public void testInvalidTypesForBucketColumn() {
+    AssertHelpers.assertThrows(
+        "Double type should not be bucketable",
+        AnalysisException.class,
+        "Function 'bucket' cannot process input: (int, float): Expected bucketed column to be date, tinyint, smallint, int, bigint, decimal, timestamp, string, or binary",
+        () -> scalarSql("SELECT system.bucket(10, cast(12.3456 as float))"));
+
+    AssertHelpers.assertThrows(
+        "Double type should not be bucketable",
+        AnalysisException.class,
+        "Function 'bucket' cannot process input: (int, double): Expected bucketed column to be date, tinyint, smallint, int, bigint, decimal, timestamp, string, or binary",
+        () -> scalarSql("SELECT system.bucket(10, cast(12.3456 as double))"));
+
+    AssertHelpers.assertThrows(
+        "Boolean type should not be bucketable",
+        AnalysisException.class,
+        "Function 'bucket' cannot process input: (int, boolean)",
+        () -> scalarSql("SELECT system.bucket(10, true)"));
+
+    AssertHelpers.assertThrows(
+        "Map types should not be bucketable",
+        AnalysisException.class,
+        "Function 'bucket' cannot process input: (int, map<int,int>)",
+        () -> scalarSql("SELECT system.bucket(10, map(1, 1))"));
+
+    AssertHelpers.assertThrows(
+        "Array types should not be bucketable",
+        AnalysisException.class,
+        "Function 'bucket' cannot process input: (int, array<bigint>)",
+        () -> scalarSql("SELECT system.bucket(10, array(1L))"));
+
+    AssertHelpers.assertThrows(
+        "Interval year-to-month type should not be bucketable",
+        AnalysisException.class,
+        "Function 'bucket' cannot process input: (int, interval year to month)",
+        () -> scalarSql("SELECT system.bucket(10, INTERVAL '100-00' YEAR TO MONTH)"));
+
+    AssertHelpers.assertThrows(
+        "Interval day-time type should not be bucketable",
+        AnalysisException.class,
+        "Function 'bucket' cannot process input: (int, interval day to second)",
+        () -> scalarSql("SELECT system.bucket(10, CAST('11 23:4:0' AS INTERVAL DAY TO SECOND))"));
+  }
+
+  @Test
+  public void testMagicFunctionsResolveForTinyIntAndSmallIntNumberOfBuckets() {
+    // Magic functions have staticinvoke in the explain output.
+    // Nonmagic calls use applyfunctionexpression instead and go through produceResults.
+    String tinyIntWidthExplain = (String) scalarSql("EXPLAIN EXTENDED SELECT system.bucket(1Y, 6)");
+    Assertions.assertThat(tinyIntWidthExplain)
+        .contains("cast(1 as int)")
+        .contains("staticinvoke(class org.apache.iceberg.spark.functions.BucketFunction$BucketInt");

Review Comment:
   Removed this test as it's redundant.
   
   We have a test that shortint and tinyint can be used for the `numBuckets` column, and we have a test that the magic function is statically invoked for integer type, so this is not needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a diff in pull request #5513: Spark 3.3 - Support bucket in FunctionCatalog

Posted by GitBox <gi...@apache.org>.
kbendick commented on code in PR #5513:
URL: https://github.com/apache/iceberg/pull/5513#discussion_r946110749


##########
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkBucketFunction.java:
##########
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.sql;
+
+import java.nio.charset.StandardCharsets;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.relocated.com.google.common.io.BaseEncoding;
+import org.apache.iceberg.spark.SparkTestBaseWithCatalog;
+import org.apache.spark.sql.AnalysisException;
+import org.assertj.core.api.Assertions;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestSparkBucketFunction extends SparkTestBaseWithCatalog {
+  public TestSparkBucketFunction() {}
+
+  @Before
+  public void useCatalog() {
+    sql("USE %s", catalogName);
+  }
+
+  @Test
+  public void testBucketIntegers() {
+    Assert.assertEquals(
+        "Byte type should bucket similarly to integer",
+        3,

Review Comment:
   I added a test suite, testValuesFromSpec, that tests against the hashed values.
   
   I could add more tests against just the transform result if we want now but we do at least have tests for the hashed transform!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org