You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/06/14 17:51:42 UTC

[GitHub] [iceberg] huaxingao opened a new pull request, #5035: Add Parquet Row Group Bloom Filter Support for write path

huaxingao opened a new pull request, #5035:
URL: https://github.com/apache/iceberg/pull/5035

   Co-authored-by: Xi Chen <js...@163.com>
   Co-authored-by: Hao Lin <li...@gmail.com>
   Co-authored-by: Huaxin Gao <hu...@apple.com>
   
   This is the write path of parquet row group bloom filter. The original PR is [here](https://github.com/apache/iceberg/pull/4831)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5035: Add Parquet Row Group Bloom Filter Support for write path

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5035:
URL: https://github.com/apache/iceberg/pull/5035#discussion_r911425368


##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderWithBloomFilter.java:
##########
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TestHelpers.Row;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.iceberg.hive.TestHiveMetastore;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.SparkValueConverter;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.SparkSession;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX;
+import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES;
+import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT;
+
+@RunWith(Parameterized.class)
+public class TestSparkReaderWithBloomFilter {
+
+  protected String tableName = null;
+  protected Table table = null;
+  protected List<Record> records = null;
+  protected DataFile dataFile = null;
+
+  private static TestHiveMetastore metastore = null;
+  protected static SparkSession spark = null;
+  protected static HiveCatalog catalog = null;
+  protected final boolean vectorized;
+  protected final boolean useBloomFilter;
+
+  public TestSparkReaderWithBloomFilter(boolean vectorized, boolean useBloomFilter) {
+    this.vectorized = vectorized;
+    this.useBloomFilter = useBloomFilter;
+  }
+
+  // Schema passed to create tables
+  public static final Schema SCHEMA = new Schema(
+      Types.NestedField.required(1, "id", Types.IntegerType.get()),
+      Types.NestedField.required(2, "id_long", Types.LongType.get()),
+      Types.NestedField.required(3, "id_double", Types.DoubleType.get()),
+      Types.NestedField.required(4, "id_float", Types.FloatType.get()),
+      Types.NestedField.required(5, "id_string", Types.StringType.get()),
+      Types.NestedField.optional(6, "id_boolean", Types.BooleanType.get()),
+      Types.NestedField.optional(7, "id_date", Types.DateType.get()),
+      Types.NestedField.optional(8, "id_int_decimal", Types.DecimalType.of(8, 2)),
+      Types.NestedField.optional(9, "id_long_decimal", Types.DecimalType.of(14, 2)),
+      Types.NestedField.optional(10, "id_fixed_decimal", Types.DecimalType.of(31, 2))
+  );
+
+  private static final int INT_MIN_VALUE = 30;
+  private static final int INT_MAX_VALUE = 329;
+  private static final int INT_VALUE_COUNT = INT_MAX_VALUE - INT_MIN_VALUE + 1;
+  private static final long LONG_BASE = 1000L;
+  private static final double DOUBLE_BASE = 10000D;
+  private static final float FLOAT_BASE = 100000F;
+  private static final String BINARY_PREFIX = "BINARY测试_";
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  @Before
+  public void writeTestDataFile() throws IOException {
+    this.tableName = "test";
+    createTable(tableName, SCHEMA);
+    this.records = Lists.newArrayList();
+
+    // records all use IDs that are in bucket id_bucket=0
+    GenericRecord record = GenericRecord.create(table.schema());
+
+    for (int i = 0; i < INT_VALUE_COUNT; i += 1) {
+      records.add(record.copy(ImmutableMap.of(
+          "id", INT_MIN_VALUE + i,
+          "id_long", LONG_BASE + INT_MIN_VALUE + i,
+          "id_double", DOUBLE_BASE + INT_MIN_VALUE + i,
+          "id_float", FLOAT_BASE + INT_MIN_VALUE + i,
+          "id_string", BINARY_PREFIX + (INT_MIN_VALUE + i),
+          "id_boolean", (i % 2 == 0) ? true : false,
+          "id_date",  LocalDate.parse("2021-09-05"),
+          "id_int_decimal", new BigDecimal(String.valueOf(77.77)),
+          "id_long_decimal", new BigDecimal(String.valueOf(88.88)),
+          "id_fixed_decimal", new BigDecimal(String.valueOf(99.99)))));
+    }
+
+    this.dataFile = writeDataFile(Files.localOutput(temp.newFile()), Row.of(0), records);
+
+    table.newAppend()
+        .appendFile(dataFile)
+        .commit();
+  }
+
+  @After
+  public void cleanup() throws IOException {
+    dropTable("test");
+  }
+
+  @Parameterized.Parameters(name = "vectorized = {0}, useBloomFilter = {1}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+        {false, false}, {true, false}, {false, true}, {true, true}
+    };
+  }
+
+  @BeforeClass
+  public static void startMetastoreAndSpark() {
+    metastore = new TestHiveMetastore();
+    metastore.start();
+    HiveConf hiveConf = metastore.hiveConf();
+
+    spark = SparkSession.builder()
+        .master("local[2]")
+        .config("spark.hadoop." + METASTOREURIS.varname, hiveConf.get(METASTOREURIS.varname))
+        .enableHiveSupport()
+        .getOrCreate();
+
+    catalog = (HiveCatalog)
+        CatalogUtil.loadCatalog(HiveCatalog.class.getName(), "hive", ImmutableMap.of(), hiveConf);
+
+    try {
+      catalog.createNamespace(Namespace.of("default"));
+    } catch (AlreadyExistsException ignored) {
+      // the default namespace already exists. ignore the create error
+    }
+  }
+
+  @AfterClass
+  public static void stopMetastoreAndSpark() throws Exception {
+    catalog = null;
+    metastore.stop();
+    metastore = null;
+    spark.stop();
+    spark = null;
+  }
+
+  protected void createTable(String name, Schema schema) {
+    table = catalog.createTable(TableIdentifier.of("default", name), schema);
+    TableOperations ops = ((BaseTable) table).operations();
+    TableMetadata meta = ops.current();
+    ops.commit(meta, meta.upgradeToFormatVersion(2));
+
+    if (useBloomFilter) {
+      table.updateProperties()
+          .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "id", "true")
+          .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "id_long", "true")
+          .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "id_double", "true")
+          .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "id_float", "true")
+          .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "id_string", "true")
+          .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "id_boolean", "true")
+          .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "id_date", "true")
+          .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "id_int_decimal", "true")
+          .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "id_long_decimal", "true")
+          .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "id_fixed_decimal", "true")
+          .commit();
+    }
+
+    table.updateProperties()
+        .set(TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, "100")  // to have multiple row groups
+        .commit();
+    if (vectorized) {
+      table.updateProperties()
+          .set(TableProperties.PARQUET_VECTORIZATION_ENABLED, "true")
+          .set(TableProperties.PARQUET_BATCH_SIZE, "4")
+          .commit();
+    }
+  }
+
+  protected void dropTable(String name) {
+    catalog.dropTable(TableIdentifier.of("default", name));
+  }
+
+  private DataFile writeDataFile(OutputFile out, StructLike partition, List<Record> rows)

Review Comment:
   What I don't understand is why this is pulling the configuration from table properties to set it on the factory, when you could use `Parquet.newWriter(...).forTable(table)...` to get the same behavior?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #5035: Add Parquet Row Group Bloom Filter Support for write path

Posted by GitBox <gi...@apache.org>.
rdblue commented on PR #5035:
URL: https://github.com/apache/iceberg/pull/5035#issuecomment-1170225769

   Overall, this looks good to me, although I'm not sure about why the Spark test needs to write the file using an appender directly rather than using the new write support in `Parquet`. It would also be good to add the test to Spark 3.3.
   
   @huaxingao, can you rebase this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] huaxingao commented on a diff in pull request #5035: Add Parquet Row Group Bloom Filter Support for write path

Posted by GitBox <gi...@apache.org>.
huaxingao commented on code in PR #5035:
URL: https://github.com/apache/iceberg/pull/5035#discussion_r911330816


##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderWithBloomFilter.java:
##########
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TestHelpers.Row;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.iceberg.hive.TestHiveMetastore;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.SparkValueConverter;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.SparkSession;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX;
+import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES;
+import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT;
+
+@RunWith(Parameterized.class)
+public class TestSparkReaderWithBloomFilter {
+
+  protected String tableName = null;
+  protected Table table = null;
+  protected List<Record> records = null;
+  protected DataFile dataFile = null;
+
+  private static TestHiveMetastore metastore = null;
+  protected static SparkSession spark = null;
+  protected static HiveCatalog catalog = null;
+  protected final boolean vectorized;
+  protected final boolean useBloomFilter;
+
+  public TestSparkReaderWithBloomFilter(boolean vectorized, boolean useBloomFilter) {
+    this.vectorized = vectorized;
+    this.useBloomFilter = useBloomFilter;
+  }
+
+  // Schema passed to create tables
+  public static final Schema SCHEMA = new Schema(
+      Types.NestedField.required(1, "id", Types.IntegerType.get()),
+      Types.NestedField.required(2, "id_long", Types.LongType.get()),
+      Types.NestedField.required(3, "id_double", Types.DoubleType.get()),
+      Types.NestedField.required(4, "id_float", Types.FloatType.get()),
+      Types.NestedField.required(5, "id_string", Types.StringType.get()),
+      Types.NestedField.optional(6, "id_boolean", Types.BooleanType.get()),
+      Types.NestedField.optional(7, "id_date", Types.DateType.get()),
+      Types.NestedField.optional(8, "id_int_decimal", Types.DecimalType.of(8, 2)),
+      Types.NestedField.optional(9, "id_long_decimal", Types.DecimalType.of(14, 2)),
+      Types.NestedField.optional(10, "id_fixed_decimal", Types.DecimalType.of(31, 2))
+  );
+
+  private static final int INT_MIN_VALUE = 30;
+  private static final int INT_MAX_VALUE = 329;
+  private static final int INT_VALUE_COUNT = INT_MAX_VALUE - INT_MIN_VALUE + 1;
+  private static final long LONG_BASE = 1000L;
+  private static final double DOUBLE_BASE = 10000D;
+  private static final float FLOAT_BASE = 100000F;
+  private static final String BINARY_PREFIX = "BINARY测试_";
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  @Before
+  public void writeTestDataFile() throws IOException {
+    this.tableName = "test";
+    createTable(tableName, SCHEMA);
+    this.records = Lists.newArrayList();
+
+    // records all use IDs that are in bucket id_bucket=0
+    GenericRecord record = GenericRecord.create(table.schema());
+
+    for (int i = 0; i < INT_VALUE_COUNT; i += 1) {
+      records.add(record.copy(ImmutableMap.of(
+          "id", INT_MIN_VALUE + i,
+          "id_long", LONG_BASE + INT_MIN_VALUE + i,
+          "id_double", DOUBLE_BASE + INT_MIN_VALUE + i,
+          "id_float", FLOAT_BASE + INT_MIN_VALUE + i,
+          "id_string", BINARY_PREFIX + (INT_MIN_VALUE + i),
+          "id_boolean", (i % 2 == 0) ? true : false,
+          "id_date",  LocalDate.parse("2021-09-05"),
+          "id_int_decimal", new BigDecimal(String.valueOf(77.77)),
+          "id_long_decimal", new BigDecimal(String.valueOf(88.88)),
+          "id_fixed_decimal", new BigDecimal(String.valueOf(99.99)))));
+    }
+
+    this.dataFile = writeDataFile(Files.localOutput(temp.newFile()), Row.of(0), records);
+
+    table.newAppend()
+        .appendFile(dataFile)
+        .commit();
+  }
+
+  @After
+  public void cleanup() throws IOException {
+    dropTable("test");
+  }
+
+  @Parameterized.Parameters(name = "vectorized = {0}, useBloomFilter = {1}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+        {false, false}, {true, false}, {false, true}, {true, true}
+    };
+  }
+
+  @BeforeClass
+  public static void startMetastoreAndSpark() {
+    metastore = new TestHiveMetastore();
+    metastore.start();
+    HiveConf hiveConf = metastore.hiveConf();
+
+    spark = SparkSession.builder()
+        .master("local[2]")
+        .config("spark.hadoop." + METASTOREURIS.varname, hiveConf.get(METASTOREURIS.varname))
+        .enableHiveSupport()
+        .getOrCreate();
+
+    catalog = (HiveCatalog)
+        CatalogUtil.loadCatalog(HiveCatalog.class.getName(), "hive", ImmutableMap.of(), hiveConf);
+
+    try {
+      catalog.createNamespace(Namespace.of("default"));
+    } catch (AlreadyExistsException ignored) {
+      // the default namespace already exists. ignore the create error
+    }
+  }
+
+  @AfterClass
+  public static void stopMetastoreAndSpark() throws Exception {
+    catalog = null;
+    metastore.stop();
+    metastore = null;
+    spark.stop();
+    spark = null;
+  }
+
+  protected void createTable(String name, Schema schema) {
+    table = catalog.createTable(TableIdentifier.of("default", name), schema);
+    TableOperations ops = ((BaseTable) table).operations();
+    TableMetadata meta = ops.current();
+    ops.commit(meta, meta.upgradeToFormatVersion(2));
+
+    if (useBloomFilter) {
+      table.updateProperties()
+          .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "id", "true")
+          .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "id_long", "true")
+          .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "id_double", "true")
+          .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "id_float", "true")
+          .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "id_string", "true")
+          .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "id_boolean", "true")
+          .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "id_date", "true")
+          .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "id_int_decimal", "true")
+          .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "id_long_decimal", "true")
+          .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "id_fixed_decimal", "true")
+          .commit();
+    }
+
+    table.updateProperties()
+        .set(TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, "100")  // to have multiple row groups
+        .commit();
+    if (vectorized) {
+      table.updateProperties()
+          .set(TableProperties.PARQUET_VECTORIZATION_ENABLED, "true")
+          .set(TableProperties.PARQUET_BATCH_SIZE, "4")
+          .commit();
+    }
+  }
+
+  protected void dropTable(String name) {
+    catalog.dropTable(TableIdentifier.of("default", name));
+  }
+
+  private DataFile writeDataFile(OutputFile out, StructLike partition, List<Record> rows)

Review Comment:
   Thank you for your comment and sorry for the late reply. Are you asking why I have a bunch of the set bloom filter properties? The reason I set each of the columns is because I want to test bloom filter for all the data types. We don't have a way to set the bloom filters for all the columns (I removed that option).  Actually I don't need to test all the data types because I have already tested them in `TestBloomRowGroupFilter`.  The reason I am adding this Spark test is that I want to have an end to end test. I can keep the test simple to test one column if you prefer that way.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #5035: Add Parquet Row Group Bloom Filter Support for write path

Posted by GitBox <gi...@apache.org>.
rdblue commented on PR #5035:
URL: https://github.com/apache/iceberg/pull/5035#issuecomment-1171674187

   Yes, thank you @hililiwei and @singhpk234! Sorry I didn't include you above!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on pull request #5035: Add Parquet Row Group Bloom Filter Support for write path

Posted by GitBox <gi...@apache.org>.
hililiwei commented on PR #5035:
URL: https://github.com/apache/iceberg/pull/5035#issuecomment-1157298271

   Great work. 👍 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] huaxingao commented on pull request #5035: Add Parquet Row Group Bloom Filter Support for write path

Posted by GitBox <gi...@apache.org>.
huaxingao commented on PR #5035:
URL: https://github.com/apache/iceberg/pull/5035#issuecomment-1171669354

   Also thank you @hililiwei @singhpk234 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] huaxingao commented on a diff in pull request #5035: Add Parquet Row Group Bloom Filter Support for write path

Posted by GitBox <gi...@apache.org>.
huaxingao commented on code in PR #5035:
URL: https://github.com/apache/iceberg/pull/5035#discussion_r910541048


##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderWithBloomFilter.java:
##########
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TestHelpers.Row;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.iceberg.hive.TestHiveMetastore;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.SparkValueConverter;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.SparkSession;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX;
+import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES;
+import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT;
+
+@RunWith(Parameterized.class)
+public class TestSparkReaderWithBloomFilter {
+
+  protected String tableName = null;
+  protected Table table = null;
+  protected List<Record> records = null;
+  protected DataFile dataFile = null;
+
+  private static TestHiveMetastore metastore = null;
+  protected static SparkSession spark = null;
+  protected static HiveCatalog catalog = null;
+  protected final boolean vectorized;
+  protected final boolean useBloomFilter;
+
+  public TestSparkReaderWithBloomFilter(boolean vectorized, boolean useBloomFilter) {
+    this.vectorized = vectorized;
+    this.useBloomFilter = useBloomFilter;
+  }
+
+  // Schema passed to create tables
+  public static final Schema SCHEMA = new Schema(
+      Types.NestedField.required(1, "id", Types.IntegerType.get()),
+      Types.NestedField.required(2, "id_long", Types.LongType.get()),
+      Types.NestedField.required(3, "id_double", Types.DoubleType.get()),
+      Types.NestedField.required(4, "id_float", Types.FloatType.get()),
+      Types.NestedField.required(5, "id_string", Types.StringType.get()),
+      Types.NestedField.optional(6, "id_boolean", Types.BooleanType.get()),
+      Types.NestedField.optional(7, "id_date", Types.DateType.get()),
+      Types.NestedField.optional(8, "id_int_decimal", Types.DecimalType.of(8, 2)),
+      Types.NestedField.optional(9, "id_long_decimal", Types.DecimalType.of(14, 2)),
+      Types.NestedField.optional(10, "id_fixed_decimal", Types.DecimalType.of(31, 2))
+  );
+
+  private static final int INT_MIN_VALUE = 30;
+  private static final int INT_MAX_VALUE = 329;
+  private static final int INT_VALUE_COUNT = INT_MAX_VALUE - INT_MIN_VALUE + 1;
+  private static final long LONG_BASE = 1000L;
+  private static final double DOUBLE_BASE = 10000D;
+  private static final float FLOAT_BASE = 100000F;
+  private static final String BINARY_PREFIX = "BINARY测试_";
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  @Before
+  public void writeTestDataFile() throws IOException {
+    this.tableName = "test";
+    createTable(tableName, SCHEMA);
+    this.records = Lists.newArrayList();
+
+    // records all use IDs that are in bucket id_bucket=0
+    GenericRecord record = GenericRecord.create(table.schema());
+
+    for (int i = 0; i < INT_VALUE_COUNT; i += 1) {
+      records.add(record.copy(ImmutableMap.of(
+          "id", INT_MIN_VALUE + i,
+          "id_long", LONG_BASE + INT_MIN_VALUE + i,
+          "id_double", DOUBLE_BASE + INT_MIN_VALUE + i,
+          "id_float", FLOAT_BASE + INT_MIN_VALUE + i,
+          "id_string", BINARY_PREFIX + (INT_MIN_VALUE + i),
+          "id_boolean", (i % 2 == 0) ? true : false,
+          "id_date",  LocalDate.parse("2021-09-05"),
+          "id_int_decimal", new BigDecimal(String.valueOf(77.77)),
+          "id_long_decimal", new BigDecimal(String.valueOf(88.88)),
+          "id_fixed_decimal", new BigDecimal(String.valueOf(99.99)))));
+    }
+
+    this.dataFile = writeDataFile(Files.localOutput(temp.newFile()), Row.of(0), records);
+
+    table.newAppend()
+        .appendFile(dataFile)
+        .commit();
+  }
+
+  @After
+  public void cleanup() throws IOException {
+    dropTable("test");
+  }
+
+  @Parameterized.Parameters(name = "vectorized = {0}, useBloomFilter = {1}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+        {false, false}, {true, false}, {false, true}, {true, true}
+    };
+  }
+
+  @BeforeClass
+  public static void startMetastoreAndSpark() {
+    metastore = new TestHiveMetastore();
+    metastore.start();
+    HiveConf hiveConf = metastore.hiveConf();
+
+    spark = SparkSession.builder()
+        .master("local[2]")
+        .config("spark.hadoop." + METASTOREURIS.varname, hiveConf.get(METASTOREURIS.varname))
+        .enableHiveSupport()
+        .getOrCreate();
+
+    catalog = (HiveCatalog)
+        CatalogUtil.loadCatalog(HiveCatalog.class.getName(), "hive", ImmutableMap.of(), hiveConf);
+
+    try {
+      catalog.createNamespace(Namespace.of("default"));
+    } catch (AlreadyExistsException ignored) {
+      // the default namespace already exists. ignore the create error
+    }
+  }
+
+  @AfterClass
+  public static void stopMetastoreAndSpark() throws Exception {
+    catalog = null;
+    metastore.stop();
+    metastore = null;
+    spark.stop();
+    spark = null;
+  }
+
+  protected void createTable(String name, Schema schema) {
+    table = catalog.createTable(TableIdentifier.of("default", name), schema);
+    TableOperations ops = ((BaseTable) table).operations();
+    TableMetadata meta = ops.current();
+    ops.commit(meta, meta.upgradeToFormatVersion(2));
+
+    if (useBloomFilter) {
+      table.updateProperties()
+          .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "id", "true")
+          .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "id_long", "true")
+          .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "id_double", "true")
+          .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "id_float", "true")
+          .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "id_string", "true")
+          .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "id_boolean", "true")
+          .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "id_date", "true")
+          .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "id_int_decimal", "true")
+          .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "id_long_decimal", "true")
+          .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "id_fixed_decimal", "true")
+          .commit();
+    }
+
+    table.updateProperties()
+        .set(TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, "100")  // to have multiple row groups
+        .commit();
+    if (vectorized) {
+      table.updateProperties()
+          .set(TableProperties.PARQUET_VECTORIZATION_ENABLED, "true")
+          .set(TableProperties.PARQUET_BATCH_SIZE, "4")
+          .commit();
+    }
+  }
+
+  protected void dropTable(String name) {
+    catalog.dropTable(TableIdentifier.of("default", name));
+  }
+
+  private DataFile writeDataFile(OutputFile out, StructLike partition, List<Record> rows)

Review Comment:
   I followed this example https://github.com/apache/iceberg/blob/master/data/src/test/java/org/apache/iceberg/data/FileHelpers.java#L114



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] huaxingao commented on a diff in pull request #5035: Add Parquet Row Group Bloom Filter Support for write path

Posted by GitBox <gi...@apache.org>.
huaxingao commented on code in PR #5035:
URL: https://github.com/apache/iceberg/pull/5035#discussion_r911438530


##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderWithBloomFilter.java:
##########
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TestHelpers.Row;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.iceberg.hive.TestHiveMetastore;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.SparkValueConverter;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.SparkSession;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX;
+import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES;
+import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT;
+
+@RunWith(Parameterized.class)
+public class TestSparkReaderWithBloomFilter {
+
+  protected String tableName = null;
+  protected Table table = null;
+  protected List<Record> records = null;
+  protected DataFile dataFile = null;
+
+  private static TestHiveMetastore metastore = null;
+  protected static SparkSession spark = null;
+  protected static HiveCatalog catalog = null;
+  protected final boolean vectorized;
+  protected final boolean useBloomFilter;
+
+  public TestSparkReaderWithBloomFilter(boolean vectorized, boolean useBloomFilter) {
+    this.vectorized = vectorized;
+    this.useBloomFilter = useBloomFilter;
+  }
+
+  // Schema passed to create tables
+  public static final Schema SCHEMA = new Schema(
+      Types.NestedField.required(1, "id", Types.IntegerType.get()),
+      Types.NestedField.required(2, "id_long", Types.LongType.get()),
+      Types.NestedField.required(3, "id_double", Types.DoubleType.get()),
+      Types.NestedField.required(4, "id_float", Types.FloatType.get()),
+      Types.NestedField.required(5, "id_string", Types.StringType.get()),
+      Types.NestedField.optional(6, "id_boolean", Types.BooleanType.get()),
+      Types.NestedField.optional(7, "id_date", Types.DateType.get()),
+      Types.NestedField.optional(8, "id_int_decimal", Types.DecimalType.of(8, 2)),
+      Types.NestedField.optional(9, "id_long_decimal", Types.DecimalType.of(14, 2)),
+      Types.NestedField.optional(10, "id_fixed_decimal", Types.DecimalType.of(31, 2))
+  );
+
+  private static final int INT_MIN_VALUE = 30;
+  private static final int INT_MAX_VALUE = 329;
+  private static final int INT_VALUE_COUNT = INT_MAX_VALUE - INT_MIN_VALUE + 1;
+  private static final long LONG_BASE = 1000L;
+  private static final double DOUBLE_BASE = 10000D;
+  private static final float FLOAT_BASE = 100000F;
+  private static final String BINARY_PREFIX = "BINARY测试_";
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  @Before
+  public void writeTestDataFile() throws IOException {
+    this.tableName = "test";
+    createTable(tableName, SCHEMA);
+    this.records = Lists.newArrayList();
+
+    // records all use IDs that are in bucket id_bucket=0
+    GenericRecord record = GenericRecord.create(table.schema());
+
+    for (int i = 0; i < INT_VALUE_COUNT; i += 1) {
+      records.add(record.copy(ImmutableMap.of(
+          "id", INT_MIN_VALUE + i,
+          "id_long", LONG_BASE + INT_MIN_VALUE + i,
+          "id_double", DOUBLE_BASE + INT_MIN_VALUE + i,
+          "id_float", FLOAT_BASE + INT_MIN_VALUE + i,
+          "id_string", BINARY_PREFIX + (INT_MIN_VALUE + i),
+          "id_boolean", (i % 2 == 0) ? true : false,
+          "id_date",  LocalDate.parse("2021-09-05"),
+          "id_int_decimal", new BigDecimal(String.valueOf(77.77)),
+          "id_long_decimal", new BigDecimal(String.valueOf(88.88)),
+          "id_fixed_decimal", new BigDecimal(String.valueOf(99.99)))));
+    }
+
+    this.dataFile = writeDataFile(Files.localOutput(temp.newFile()), Row.of(0), records);
+
+    table.newAppend()
+        .appendFile(dataFile)
+        .commit();
+  }
+
+  @After
+  public void cleanup() throws IOException {
+    dropTable("test");
+  }
+
+  @Parameterized.Parameters(name = "vectorized = {0}, useBloomFilter = {1}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+        {false, false}, {true, false}, {false, true}, {true, true}
+    };
+  }
+
+  @BeforeClass
+  public static void startMetastoreAndSpark() {
+    metastore = new TestHiveMetastore();
+    metastore.start();
+    HiveConf hiveConf = metastore.hiveConf();
+
+    spark = SparkSession.builder()
+        .master("local[2]")
+        .config("spark.hadoop." + METASTOREURIS.varname, hiveConf.get(METASTOREURIS.varname))
+        .enableHiveSupport()
+        .getOrCreate();
+
+    catalog = (HiveCatalog)
+        CatalogUtil.loadCatalog(HiveCatalog.class.getName(), "hive", ImmutableMap.of(), hiveConf);
+
+    try {
+      catalog.createNamespace(Namespace.of("default"));
+    } catch (AlreadyExistsException ignored) {
+      // the default namespace already exists. ignore the create error
+    }
+  }
+
+  @AfterClass
+  public static void stopMetastoreAndSpark() throws Exception {
+    catalog = null;
+    metastore.stop();
+    metastore = null;
+    spark.stop();
+    spark = null;
+  }
+
+  protected void createTable(String name, Schema schema) {
+    table = catalog.createTable(TableIdentifier.of("default", name), schema);
+    TableOperations ops = ((BaseTable) table).operations();
+    TableMetadata meta = ops.current();
+    ops.commit(meta, meta.upgradeToFormatVersion(2));
+
+    if (useBloomFilter) {
+      table.updateProperties()
+          .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "id", "true")
+          .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "id_long", "true")
+          .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "id_double", "true")
+          .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "id_float", "true")
+          .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "id_string", "true")
+          .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "id_boolean", "true")
+          .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "id_date", "true")
+          .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "id_int_decimal", "true")
+          .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "id_long_decimal", "true")
+          .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "id_fixed_decimal", "true")
+          .commit();
+    }
+
+    table.updateProperties()
+        .set(TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, "100")  // to have multiple row groups
+        .commit();
+    if (vectorized) {
+      table.updateProperties()
+          .set(TableProperties.PARQUET_VECTORIZATION_ENABLED, "true")
+          .set(TableProperties.PARQUET_BATCH_SIZE, "4")
+          .commit();
+    }
+  }
+
+  protected void dropTable(String name) {
+    catalog.dropTable(TableIdentifier.of("default", name));
+  }
+
+  private DataFile writeDataFile(OutputFile out, StructLike partition, List<Record> rows)

Review Comment:
   I will take a look and probably fix this in a follow up.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5035: Add Parquet Row Group Bloom Filter Support for write path

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5035:
URL: https://github.com/apache/iceberg/pull/5035#discussion_r910544270


##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderWithBloomFilter.java:
##########
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TestHelpers.Row;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.iceberg.hive.TestHiveMetastore;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.SparkValueConverter;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.SparkSession;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX;
+import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES;
+import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT;
+
+@RunWith(Parameterized.class)
+public class TestSparkReaderWithBloomFilter {
+
+  protected String tableName = null;
+  protected Table table = null;
+  protected List<Record> records = null;
+  protected DataFile dataFile = null;
+
+  private static TestHiveMetastore metastore = null;
+  protected static SparkSession spark = null;
+  protected static HiveCatalog catalog = null;
+  protected final boolean vectorized;
+  protected final boolean useBloomFilter;
+
+  public TestSparkReaderWithBloomFilter(boolean vectorized, boolean useBloomFilter) {
+    this.vectorized = vectorized;
+    this.useBloomFilter = useBloomFilter;
+  }
+
+  // Schema passed to create tables
+  public static final Schema SCHEMA = new Schema(
+      Types.NestedField.required(1, "id", Types.IntegerType.get()),
+      Types.NestedField.required(2, "id_long", Types.LongType.get()),
+      Types.NestedField.required(3, "id_double", Types.DoubleType.get()),
+      Types.NestedField.required(4, "id_float", Types.FloatType.get()),
+      Types.NestedField.required(5, "id_string", Types.StringType.get()),
+      Types.NestedField.optional(6, "id_boolean", Types.BooleanType.get()),
+      Types.NestedField.optional(7, "id_date", Types.DateType.get()),
+      Types.NestedField.optional(8, "id_int_decimal", Types.DecimalType.of(8, 2)),
+      Types.NestedField.optional(9, "id_long_decimal", Types.DecimalType.of(14, 2)),
+      Types.NestedField.optional(10, "id_fixed_decimal", Types.DecimalType.of(31, 2))
+  );
+
+  private static final int INT_MIN_VALUE = 30;
+  private static final int INT_MAX_VALUE = 329;
+  private static final int INT_VALUE_COUNT = INT_MAX_VALUE - INT_MIN_VALUE + 1;
+  private static final long LONG_BASE = 1000L;
+  private static final double DOUBLE_BASE = 10000D;
+  private static final float FLOAT_BASE = 100000F;
+  private static final String BINARY_PREFIX = "BINARY测试_";
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  @Before
+  public void writeTestDataFile() throws IOException {
+    this.tableName = "test";
+    createTable(tableName, SCHEMA);
+    this.records = Lists.newArrayList();
+
+    // records all use IDs that are in bucket id_bucket=0
+    GenericRecord record = GenericRecord.create(table.schema());
+
+    for (int i = 0; i < INT_VALUE_COUNT; i += 1) {
+      records.add(record.copy(ImmutableMap.of(
+          "id", INT_MIN_VALUE + i,
+          "id_long", LONG_BASE + INT_MIN_VALUE + i,
+          "id_double", DOUBLE_BASE + INT_MIN_VALUE + i,
+          "id_float", FLOAT_BASE + INT_MIN_VALUE + i,
+          "id_string", BINARY_PREFIX + (INT_MIN_VALUE + i),
+          "id_boolean", (i % 2 == 0) ? true : false,
+          "id_date",  LocalDate.parse("2021-09-05"),
+          "id_int_decimal", new BigDecimal(String.valueOf(77.77)),
+          "id_long_decimal", new BigDecimal(String.valueOf(88.88)),
+          "id_fixed_decimal", new BigDecimal(String.valueOf(99.99)))));
+    }
+
+    this.dataFile = writeDataFile(Files.localOutput(temp.newFile()), Row.of(0), records);
+
+    table.newAppend()
+        .appendFile(dataFile)
+        .commit();
+  }
+
+  @After
+  public void cleanup() throws IOException {
+    dropTable("test");
+  }
+
+  @Parameterized.Parameters(name = "vectorized = {0}, useBloomFilter = {1}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+        {false, false}, {true, false}, {false, true}, {true, true}
+    };
+  }
+
+  @BeforeClass
+  public static void startMetastoreAndSpark() {
+    metastore = new TestHiveMetastore();
+    metastore.start();
+    HiveConf hiveConf = metastore.hiveConf();
+
+    spark = SparkSession.builder()
+        .master("local[2]")
+        .config("spark.hadoop." + METASTOREURIS.varname, hiveConf.get(METASTOREURIS.varname))
+        .enableHiveSupport()
+        .getOrCreate();
+
+    catalog = (HiveCatalog)
+        CatalogUtil.loadCatalog(HiveCatalog.class.getName(), "hive", ImmutableMap.of(), hiveConf);
+
+    try {
+      catalog.createNamespace(Namespace.of("default"));
+    } catch (AlreadyExistsException ignored) {
+      // the default namespace already exists. ignore the create error
+    }
+  }
+
+  @AfterClass
+  public static void stopMetastoreAndSpark() throws Exception {
+    catalog = null;
+    metastore.stop();
+    metastore = null;
+    spark.stop();
+    spark = null;
+  }
+
+  protected void createTable(String name, Schema schema) {
+    table = catalog.createTable(TableIdentifier.of("default", name), schema);
+    TableOperations ops = ((BaseTable) table).operations();
+    TableMetadata meta = ops.current();
+    ops.commit(meta, meta.upgradeToFormatVersion(2));
+
+    if (useBloomFilter) {
+      table.updateProperties()
+          .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "id", "true")
+          .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "id_long", "true")
+          .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "id_double", "true")
+          .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "id_float", "true")
+          .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "id_string", "true")
+          .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "id_boolean", "true")
+          .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "id_date", "true")
+          .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "id_int_decimal", "true")
+          .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "id_long_decimal", "true")
+          .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "id_fixed_decimal", "true")
+          .commit();
+    }
+
+    table.updateProperties()
+        .set(TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, "100")  // to have multiple row groups
+        .commit();
+    if (vectorized) {
+      table.updateProperties()
+          .set(TableProperties.PARQUET_VECTORIZATION_ENABLED, "true")
+          .set(TableProperties.PARQUET_BATCH_SIZE, "4")
+          .commit();
+    }
+  }
+
+  protected void dropTable(String name) {
+    catalog.dropTable(TableIdentifier.of("default", name));
+  }
+
+  private DataFile writeDataFile(OutputFile out, StructLike partition, List<Record> rows)

Review Comment:
   That's fine, I just don't understand why all of the properties are set on the factory. Is there an easier way to do that using the `Parquet` utils that were updated in this PR?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #5035: Add Parquet Row Group Bloom Filter Support for write path

Posted by GitBox <gi...@apache.org>.
rdblue commented on PR #5035:
URL: https://github.com/apache/iceberg/pull/5035#issuecomment-1171663307

   Thanks, @huaxingao!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5035: Add Parquet Row Group Bloom Filter Support for write path

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5035:
URL: https://github.com/apache/iceberg/pull/5035#discussion_r910189031


##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderWithBloomFilter.java:
##########
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TestHelpers.Row;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.iceberg.hive.TestHiveMetastore;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.SparkValueConverter;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.SparkSession;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX;
+import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES;
+import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT;
+
+@RunWith(Parameterized.class)
+public class TestSparkReaderWithBloomFilter {
+
+  protected String tableName = null;
+  protected Table table = null;
+  protected List<Record> records = null;
+  protected DataFile dataFile = null;
+
+  private static TestHiveMetastore metastore = null;
+  protected static SparkSession spark = null;
+  protected static HiveCatalog catalog = null;
+  protected final boolean vectorized;
+  protected final boolean useBloomFilter;
+
+  public TestSparkReaderWithBloomFilter(boolean vectorized, boolean useBloomFilter) {
+    this.vectorized = vectorized;
+    this.useBloomFilter = useBloomFilter;
+  }
+
+  // Schema passed to create tables
+  public static final Schema SCHEMA = new Schema(
+      Types.NestedField.required(1, "id", Types.IntegerType.get()),
+      Types.NestedField.required(2, "id_long", Types.LongType.get()),
+      Types.NestedField.required(3, "id_double", Types.DoubleType.get()),
+      Types.NestedField.required(4, "id_float", Types.FloatType.get()),
+      Types.NestedField.required(5, "id_string", Types.StringType.get()),
+      Types.NestedField.optional(6, "id_boolean", Types.BooleanType.get()),
+      Types.NestedField.optional(7, "id_date", Types.DateType.get()),
+      Types.NestedField.optional(8, "id_int_decimal", Types.DecimalType.of(8, 2)),
+      Types.NestedField.optional(9, "id_long_decimal", Types.DecimalType.of(14, 2)),
+      Types.NestedField.optional(10, "id_fixed_decimal", Types.DecimalType.of(31, 2))
+  );
+
+  private static final int INT_MIN_VALUE = 30;
+  private static final int INT_MAX_VALUE = 329;
+  private static final int INT_VALUE_COUNT = INT_MAX_VALUE - INT_MIN_VALUE + 1;
+  private static final long LONG_BASE = 1000L;
+  private static final double DOUBLE_BASE = 10000D;
+  private static final float FLOAT_BASE = 100000F;
+  private static final String BINARY_PREFIX = "BINARY测试_";
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  @Before
+  public void writeTestDataFile() throws IOException {
+    this.tableName = "test";
+    createTable(tableName, SCHEMA);
+    this.records = Lists.newArrayList();
+
+    // records all use IDs that are in bucket id_bucket=0
+    GenericRecord record = GenericRecord.create(table.schema());
+
+    for (int i = 0; i < INT_VALUE_COUNT; i += 1) {
+      records.add(record.copy(ImmutableMap.of(
+          "id", INT_MIN_VALUE + i,
+          "id_long", LONG_BASE + INT_MIN_VALUE + i,
+          "id_double", DOUBLE_BASE + INT_MIN_VALUE + i,
+          "id_float", FLOAT_BASE + INT_MIN_VALUE + i,
+          "id_string", BINARY_PREFIX + (INT_MIN_VALUE + i),
+          "id_boolean", (i % 2 == 0) ? true : false,
+          "id_date",  LocalDate.parse("2021-09-05"),
+          "id_int_decimal", new BigDecimal(String.valueOf(77.77)),
+          "id_long_decimal", new BigDecimal(String.valueOf(88.88)),
+          "id_fixed_decimal", new BigDecimal(String.valueOf(99.99)))));
+    }
+
+    this.dataFile = writeDataFile(Files.localOutput(temp.newFile()), Row.of(0), records);
+
+    table.newAppend()
+        .appendFile(dataFile)
+        .commit();
+  }
+
+  @After
+  public void cleanup() throws IOException {
+    dropTable("test");
+  }
+
+  @Parameterized.Parameters(name = "vectorized = {0}, useBloomFilter = {1}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+        {false, false}, {true, false}, {false, true}, {true, true}
+    };
+  }
+
+  @BeforeClass
+  public static void startMetastoreAndSpark() {
+    metastore = new TestHiveMetastore();
+    metastore.start();
+    HiveConf hiveConf = metastore.hiveConf();
+
+    spark = SparkSession.builder()
+        .master("local[2]")
+        .config("spark.hadoop." + METASTOREURIS.varname, hiveConf.get(METASTOREURIS.varname))
+        .enableHiveSupport()
+        .getOrCreate();
+
+    catalog = (HiveCatalog)
+        CatalogUtil.loadCatalog(HiveCatalog.class.getName(), "hive", ImmutableMap.of(), hiveConf);
+
+    try {
+      catalog.createNamespace(Namespace.of("default"));
+    } catch (AlreadyExistsException ignored) {
+      // the default namespace already exists. ignore the create error
+    }
+  }
+
+  @AfterClass
+  public static void stopMetastoreAndSpark() throws Exception {
+    catalog = null;
+    metastore.stop();
+    metastore = null;
+    spark.stop();
+    spark = null;
+  }
+
+  protected void createTable(String name, Schema schema) {
+    table = catalog.createTable(TableIdentifier.of("default", name), schema);
+    TableOperations ops = ((BaseTable) table).operations();
+    TableMetadata meta = ops.current();
+    ops.commit(meta, meta.upgradeToFormatVersion(2));
+
+    if (useBloomFilter) {
+      table.updateProperties()
+          .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "id", "true")
+          .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "id_long", "true")
+          .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "id_double", "true")
+          .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "id_float", "true")
+          .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "id_string", "true")
+          .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "id_boolean", "true")
+          .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "id_date", "true")
+          .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "id_int_decimal", "true")
+          .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "id_long_decimal", "true")
+          .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "id_fixed_decimal", "true")
+          .commit();
+    }
+
+    table.updateProperties()
+        .set(TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, "100")  // to have multiple row groups
+        .commit();
+    if (vectorized) {
+      table.updateProperties()
+          .set(TableProperties.PARQUET_VECTORIZATION_ENABLED, "true")
+          .set(TableProperties.PARQUET_BATCH_SIZE, "4")
+          .commit();
+    }
+  }
+
+  protected void dropTable(String name) {
+    catalog.dropTable(TableIdentifier.of("default", name));
+  }
+
+  private DataFile writeDataFile(OutputFile out, StructLike partition, List<Record> rows)

Review Comment:
   Why not use the `Parquet` utils to write the file?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] huaxingao commented on a diff in pull request #5035: Add Parquet Row Group Bloom Filter Support for write path

Posted by GitBox <gi...@apache.org>.
huaxingao commented on code in PR #5035:
URL: https://github.com/apache/iceberg/pull/5035#discussion_r897327666


##########
parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java:
##########
@@ -293,19 +305,29 @@ public <D> FileAppender<D> build() throws IOException {
             .withRowGroupSize(rowGroupSize)
             .withPageSize(pageSize)
             .withDictionaryPageSize(dictionaryPageSize);
-        // Todo: The following code needs to be improved in the bloom filter write path PR.
-        for (Map.Entry<String, String> entry : config.entrySet()) {
-          String key = entry.getKey();
-          if (key.startsWith(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX)) {
-            String columnPath = key.replaceFirst(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX, "");
-            String value = entry.getValue();
-            parquetWriteBuilder.withBloomFilterEnabled(columnPath, Boolean.valueOf(value));
-          }
+
+        for (Map.Entry<String, String> entry : columnBloomFilterEnabled.entrySet()) {
+          String colPath = entry.getKey();
+          String bloomEnabled = entry.getValue();
+          parquetWriteBuilder.withBloomFilterEnabled(colPath, Boolean.valueOf(bloomEnabled));
         }
+
         return new ParquetWriteAdapter<>(parquetWriteBuilder.build(), metricsConfig);
       }
     }
 
+    private static Map<String, String> bloomColumnConfigMap(String prefix, Map<String, String> config) {
+      Map<String, String> columnBloomFilterConfig = Maps.newHashMap();
+      config.keySet().stream()
+          .filter(key -> key.startsWith(prefix))
+          .forEach(key -> {
+            String columnPath = key.replaceFirst(prefix, "");
+            String bloomFilterMode = config.get(key);
+            columnBloomFilterConfig.put(columnPath, bloomFilterMode);
+          });
+      return columnBloomFilterConfig;
+    }

Review Comment:
   Changed. Thanks!



##########
docs/tables/configuration.md:
##########
@@ -50,6 +50,8 @@ Iceberg tables support table properties to configure table behavior, like the de
 | write.parquet.dict-size-bytes      | 2097152 (2 MB)     | Parquet dictionary page size                       |
 | write.parquet.compression-codec    | gzip               | Parquet compression codec: zstd, brotli, lz4, gzip, snappy, uncompressed |
 | write.parquet.compression-level    | null               | Parquet compression level                          |
+| write.parquet.bloom-filter-enabled.column.col1          | (not set) | Enables writing a bloom filter for the column |

Review Comment:
   Sounds good. Changed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] huaxingao commented on pull request #5035: Add Parquet Row Group Bloom Filter Support for write path

Posted by GitBox <gi...@apache.org>.
huaxingao commented on PR #5035:
URL: https://github.com/apache/iceberg/pull/5035#issuecomment-1155788005

   cc @rdblue @RussellSpitzer @kbendick @chenjunjiedada @hililiwei


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] huaxingao commented on pull request #5035: Add Parquet Row Group Bloom Filter Support for write path

Posted by GitBox <gi...@apache.org>.
huaxingao commented on PR #5035:
URL: https://github.com/apache/iceberg/pull/5035#issuecomment-1171668121

   Thank you very much! @rdblue 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue merged pull request #5035: Add Parquet Row Group Bloom Filter Support for write path

Posted by GitBox <gi...@apache.org>.
rdblue merged PR #5035:
URL: https://github.com/apache/iceberg/pull/5035


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] singhpk234 commented on a diff in pull request #5035: Add Parquet Row Group Bloom Filter Support for write path

Posted by GitBox <gi...@apache.org>.
singhpk234 commented on code in PR #5035:
URL: https://github.com/apache/iceberg/pull/5035#discussion_r897151281


##########
docs/tables/configuration.md:
##########
@@ -50,6 +50,8 @@ Iceberg tables support table properties to configure table behavior, like the de
 | write.parquet.dict-size-bytes      | 2097152 (2 MB)     | Parquet dictionary page size                       |
 | write.parquet.compression-codec    | gzip               | Parquet compression codec: zstd, brotli, lz4, gzip, snappy, uncompressed |
 | write.parquet.compression-level    | null               | Parquet compression level                          |
+| write.parquet.bloom-filter-enabled.column.col1          | (not set) | Enables writing a bloom filter for the column |

Review Comment:
   [minor] should we say it enables bf for col1 : `Enables writing a bloom filter for the column : col1`
   
   like we did [here](https://github.com/apache/iceberg/pull/5035/files#diff-59125802f6fa3d6d51c716484c3cdff3d85af7c834b8102296b2321b17aec696R64)



##########
parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java:
##########
@@ -293,19 +305,29 @@ public <D> FileAppender<D> build() throws IOException {
             .withRowGroupSize(rowGroupSize)
             .withPageSize(pageSize)
             .withDictionaryPageSize(dictionaryPageSize);
-        // Todo: The following code needs to be improved in the bloom filter write path PR.
-        for (Map.Entry<String, String> entry : config.entrySet()) {
-          String key = entry.getKey();
-          if (key.startsWith(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX)) {
-            String columnPath = key.replaceFirst(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX, "");
-            String value = entry.getValue();
-            parquetWriteBuilder.withBloomFilterEnabled(columnPath, Boolean.valueOf(value));
-          }
+
+        for (Map.Entry<String, String> entry : columnBloomFilterEnabled.entrySet()) {
+          String colPath = entry.getKey();
+          String bloomEnabled = entry.getValue();
+          parquetWriteBuilder.withBloomFilterEnabled(colPath, Boolean.valueOf(bloomEnabled));
         }
+
         return new ParquetWriteAdapter<>(parquetWriteBuilder.build(), metricsConfig);
       }
     }
 
+    private static Map<String, String> bloomColumnConfigMap(String prefix, Map<String, String> config) {
+      Map<String, String> columnBloomFilterConfig = Maps.newHashMap();
+      config.keySet().stream()
+          .filter(key -> key.startsWith(prefix))
+          .forEach(key -> {
+            String columnPath = key.replaceFirst(prefix, "");
+            String bloomFilterMode = config.get(key);
+            columnBloomFilterConfig.put(columnPath, bloomFilterMode);
+          });
+      return columnBloomFilterConfig;
+    }

Review Comment:
   [minor] can use this util [PropertyUtil#propertiesWithPrefix](https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/util/PropertyUtil.java#L85-L97)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org