You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/09/16 17:37:55 UTC

[GitHub] [iceberg] aokolnychyi opened a new pull request #3132: Spark: Add SparkReadConf and SparkWriteConf

aokolnychyi opened a new pull request #3132:
URL: https://github.com/apache/iceberg/pull/3132


   I think we can improve the way we handle our Spark configuration right now. Specifically, I see a few issues.
   
   - Our SQL configs are scattered across a number of classes.
   
   For example, we have some SQL configs in `SparkUtil`, `IcebergSource`, `SparkWriteBuilder`, `Spark3Util`. I think having a separate class for such constants would make sense, given that we already have `SparkReadOptions` and `SparkWriteOptions`.
   
   - We repeat the same code to parse the read/write options, session conf, table metadata in multiple places.
   
   For example, we duplicate the code for getting the write file format, split related configs, whether to check nullability and order of incoming columns, etc. This happens partially because we have Spark 2 and Spark 3 integrations but also because we have multiple scans/writers in Spark 3. With upcoming support for merge-on-read, there will be even more classes where we would need to parse the same arguments.
   
   - The config resolution logic complicates classes where it is defined.
   
   Our writer and scan builders are already complicated so removing the config resolution logic and dedicating a separate class for that seems like a promising idea.
   
   - We have inconsistent precedence order for Spark configs.
   
   Historically, we interpreted whether a read/write option should take precedence over the session config inconsistently. At some point, we had a [discussion](https://github.com/apache/iceberg/pull/2248#discussion_r580693954) and reached a consensus that the most common way to think about it is read option -> session conf -> table metadata. There are still places where the session config overrides options. I think we should finally fix that and be consistent even though it may be a behavior change.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #3132: Spark: Add SparkReadConf and SparkWriteConf

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #3132:
URL: https://github.com/apache/iceberg/pull/3132#discussion_r715180685



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.util.Locale;
+import java.util.Map;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.spark.sql.RuntimeConfig;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * A class for common Iceberg configs for Spark writes.
+ * <p>
+ * If a config is set at multiple levels, the following order of precedence is used (top to bottom):
+ * <ol>
+ *   <li>Write options</li>
+ *   <li>Session configuration</li>
+ *   <li>Table metadata</li>
+ * </ol>
+ * The most specific value is set in write options and takes precedence over all other configs.
+ * If no write option is provided, this class checks the session configuration for any overrides.
+ * If no applicable value is found in the session configuration, this classes uses the table metadata.
+ * <p>
+ * Note this class is NOT meant to be serialized and sent to executors.
+ */
+public class SparkWriteConf {
+
+  private final Table table;
+  private final RuntimeConfig sessionConf;
+  private final Map<String, String> writeOptions;
+
+  public SparkWriteConf(SparkSession spark, Table table, Map<String, String> writeOptions) {
+    this.table = table;
+    this.sessionConf = spark.conf();
+    this.writeOptions = writeOptions;
+  }
+
+  public boolean checkNullability() {
+    String writeOptionValue = writeOptions.get(SparkWriteOptions.CHECK_NULLABILITY);
+    if (writeOptionValue != null) {
+      return Boolean.parseBoolean(writeOptionValue);
+    }
+
+    String sessionConfValue = sessionConf.get(SparkSQLConfigs.CHECK_NULLABILITY, null);
+    if (sessionConfValue != null) {
+      return Boolean.parseBoolean(sessionConfValue);
+    }
+
+    return true;
+  }
+
+  public boolean checkOrdering() {
+    String writeOptionValue = writeOptions.get(SparkWriteOptions.CHECK_ORDERING);
+    if (writeOptionValue != null) {
+      return Boolean.parseBoolean(writeOptionValue);
+    }
+
+    String sessionConfValue = sessionConf.get(SparkSQLConfigs.CHECK_ORDERING, null);
+    if (sessionConfValue != null) {
+      return Boolean.parseBoolean(sessionConfValue);
+    }
+
+    return true;
+  }
+
+  /**
+   * Enables writing a timestamp with time zone as a timestamp without time zone.
+   * <p>
+   * Generally, this is not safe as a timestamp without time zone is supposed to represent the wall-clock time,
+   * i.e. no matter the reader/writer timezone 3PM should always be read as 3PM,
+   * but a timestamp with time zone represents instant semantics, i.e. the timestamp
+   * is adjusted so that the corresponding time in the reader timezone is displayed.
+   * <p>
+   * When set to false (default), an exception must be thrown if the table contains a timestamp without time zone.
+   *
+   * @return boolean indicating if writing timestamps without timezone is allowed
+   */
+  public boolean handleTimestampWithoutZone() {
+    String writeOptionValue = writeOptions.get(SparkWriteOptions.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE);
+    if (writeOptionValue != null) {
+      return Boolean.parseBoolean(writeOptionValue);
+    }
+
+    String sessionConfValue = sessionConf.get(SparkSQLConfigs.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE, null);
+    if (sessionConfValue != null) {
+      return Boolean.parseBoolean(sessionConfValue);
+    }
+
+    return false;
+  }
+
+  public String overwriteMode() {
+    String overwriteMode = writeOptions.get("overwrite-mode");
+    return overwriteMode != null ? overwriteMode.toLowerCase(Locale.ROOT) : null;
+  }
+
+  public String wapId() {
+    return sessionConf.get("spark.wap.id", null);
+  }
+
+  public FileFormat dataFileFormat() {
+    String writeOptionValue = writeOptions.get(SparkWriteOptions.WRITE_FORMAT);

Review comment:
       this is more of the code I think we can simplify by making a common
   
   getConfiguredString(String name, String default)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3132: Spark: Add SparkReadConf and SparkWriteConf

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3132:
URL: https://github.com/apache/iceberg/pull/3132#discussion_r725394379



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
##########
@@ -135,43 +130,16 @@ public static boolean hasTimestampWithoutZone(Schema schema) {
   }
 
   /**
-   * Allow reading/writing timestamp without time zone as timestamp with time zone. Generally,
-   * this is not safe as timestamp without time zone is supposed to represent wall clock time semantics,
-   * i.e. no matter the reader/writer timezone 3PM should always be read as 3PM,
-   * but timestamp with time zone represents instant semantics, i.e the timestamp
-   * is adjusted so that the corresponding time in the reader timezone is displayed.
-   * When set to false (default), we throw an exception at runtime
-   * "Spark does not support timestamp without time zone fields" if reading timestamp without time zone fields
+   * Checks whether timestamp types for new tables should be stored with timezone info.
+   * <p>
+   * The default value is false and all timestamp fields are stored as {@link Types.TimestampType#withZone()}.
+   * If enabled, all timestamp fields in new tables will be stored as {@link Types.TimestampType#withoutZone()}.
    *
-   * @param readerConfig table read options
-   * @param sessionConf spark session configurations
-   * @return boolean indicating if reading timestamps without timezone is allowed
-   */
-  public static boolean canHandleTimestampWithoutZone(Map<String, String> readerConfig, RuntimeConfig sessionConf) {
-    String readerOption = readerConfig.get(HANDLE_TIMESTAMP_WITHOUT_TIMEZONE);
-    if (readerOption != null) {
-      return Boolean.parseBoolean(readerOption);
-    }
-    String sessionConfValue = sessionConf.get(HANDLE_TIMESTAMP_WITHOUT_TIMEZONE, null);
-    if (sessionConfValue != null) {
-      return Boolean.parseBoolean(sessionConfValue);
-    }
-    return false;
-  }
-
-  /**
-   * Check whether the spark session config contains a {@link SparkUtil#USE_TIMESTAMP_WITHOUT_TIME_ZONE_IN_NEW_TABLES}
-   * property.
-   * Default value - false
-   * If true in new table all timestamp fields will be stored as {@link Types.TimestampType#withoutZone()},
-   * otherwise {@link Types.TimestampType#withZone()} will be used
-   *
-   * @param sessionConf a spark runtime config
-   * @return true if the session config has {@link SparkUtil#USE_TIMESTAMP_WITHOUT_TIME_ZONE_IN_NEW_TABLES} property
-   * and this property is set to true
+   * @param sessionConf a Spark runtime config
+   * @return true if timestamp types for new tables should be stored with timezone info
    */
   public static boolean useTimestampWithoutZoneInNewTables(RuntimeConfig sessionConf) {
-    String sessionConfValue = sessionConf.get(USE_TIMESTAMP_WITHOUT_TIME_ZONE_IN_NEW_TABLES, null);
+    String sessionConfValue = sessionConf.get(SparkSQLConfigs.USE_TIMESTAMP_WITHOUT_TIME_ZONE_IN_NEW_TABLES, null);

Review comment:
       Any plan to make `SparkConfParser` handle these one-off cases?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3132: Spark: Add SparkReadConf and SparkWriteConf

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3132:
URL: https://github.com/apache/iceberg/pull/3132#discussion_r715890427



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.util.Map;
+import java.util.function.Function;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.spark.sql.RuntimeConfig;
+import org.apache.spark.sql.SparkSession;
+
+class SparkConfParser {

Review comment:
       I tried to reduce the duplication by introducing this class. Let me know if I am over engineering.
   It is package-private on purpose and is used in read and write conf classes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3132: Spark: Add SparkReadConf and SparkWriteConf

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3132:
URL: https://github.com/apache/iceberg/pull/3132#discussion_r725395015



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
##########
@@ -83,19 +83,21 @@ public DataSourceReader createReader(StructType readSchema, DataSourceOptions op
         "Save mode %s is not supported", mode);
     Configuration conf = new Configuration(lazyBaseConf());
     Table table = getTableAndResolveHadoopConfiguration(options, conf);
-    boolean handleTimestampWithoutZone =
-            SparkUtil.canHandleTimestampWithoutZone(options.asMap(), lazySparkSession().conf());
-    Preconditions.checkArgument(handleTimestampWithoutZone || !SparkUtil.hasTimestampWithoutZone(table.schema()),
-            SparkUtil.TIMESTAMP_WITHOUT_TIMEZONE_ERROR);
+    SparkWriteConf writeConf = new SparkWriteConf(lazySparkSession(), table, options.asMap());

Review comment:
       `options.asMap()` returns the lower case map. Should we ensure that we're using case insensitive lookup in that map for Spark options?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3132: Spark: Add SparkReadConf and SparkWriteConf

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3132:
URL: https://github.com/apache/iceberg/pull/3132#discussion_r726812449



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
##########
@@ -135,43 +130,16 @@ public static boolean hasTimestampWithoutZone(Schema schema) {
   }
 
   /**
-   * Allow reading/writing timestamp without time zone as timestamp with time zone. Generally,
-   * this is not safe as timestamp without time zone is supposed to represent wall clock time semantics,
-   * i.e. no matter the reader/writer timezone 3PM should always be read as 3PM,
-   * but timestamp with time zone represents instant semantics, i.e the timestamp
-   * is adjusted so that the corresponding time in the reader timezone is displayed.
-   * When set to false (default), we throw an exception at runtime
-   * "Spark does not support timestamp without time zone fields" if reading timestamp without time zone fields
+   * Checks whether timestamp types for new tables should be stored with timezone info.
+   * <p>
+   * The default value is false and all timestamp fields are stored as {@link Types.TimestampType#withZone()}.
+   * If enabled, all timestamp fields in new tables will be stored as {@link Types.TimestampType#withoutZone()}.
    *
-   * @param readerConfig table read options
-   * @param sessionConf spark session configurations
-   * @return boolean indicating if reading timestamps without timezone is allowed
-   */
-  public static boolean canHandleTimestampWithoutZone(Map<String, String> readerConfig, RuntimeConfig sessionConf) {
-    String readerOption = readerConfig.get(HANDLE_TIMESTAMP_WITHOUT_TIMEZONE);
-    if (readerOption != null) {
-      return Boolean.parseBoolean(readerOption);
-    }
-    String sessionConfValue = sessionConf.get(HANDLE_TIMESTAMP_WITHOUT_TIMEZONE, null);
-    if (sessionConfValue != null) {
-      return Boolean.parseBoolean(sessionConfValue);
-    }
-    return false;
-  }
-
-  /**
-   * Check whether the spark session config contains a {@link SparkUtil#USE_TIMESTAMP_WITHOUT_TIME_ZONE_IN_NEW_TABLES}
-   * property.
-   * Default value - false
-   * If true in new table all timestamp fields will be stored as {@link Types.TimestampType#withoutZone()},
-   * otherwise {@link Types.TimestampType#withZone()} will be used
-   *
-   * @param sessionConf a spark runtime config
-   * @return true if the session config has {@link SparkUtil#USE_TIMESTAMP_WITHOUT_TIME_ZONE_IN_NEW_TABLES} property
-   * and this property is set to true
+   * @param sessionConf a Spark runtime config
+   * @return true if timestamp types for new tables should be stored with timezone info
    */
   public static boolean useTimestampWithoutZoneInNewTables(RuntimeConfig sessionConf) {
-    String sessionConfValue = sessionConf.get(USE_TIMESTAMP_WITHOUT_TIME_ZONE_IN_NEW_TABLES, null);
+    String sessionConfValue = sessionConf.get(SparkSQLConfigs.USE_TIMESTAMP_WITHOUT_TIME_ZONE_IN_NEW_TABLES, null);

Review comment:
       This is the only place right now. We will also get rid of it in 3.2. We may consider handling it in the future, though.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3132: Spark: Add SparkReadConf and SparkWriteConf

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3132:
URL: https://github.com/apache/iceberg/pull/3132#discussion_r715891342



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.util.Map;
+import java.util.function.Function;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.spark.sql.RuntimeConfig;
+import org.apache.spark.sql.SparkSession;
+
+class SparkConfParser {
+
+  private final Map<String, String> properties;
+  private final RuntimeConfig sessionConf;
+  private final Map<String, String> options;
+
+  SparkConfParser(SparkSession spark, Table table, Map<String, String> options) {
+    this.properties = table.properties();
+    this.sessionConf = spark.conf();
+    this.options = options;
+  }
+
+  public BooleanConfParser booleanConf() {
+    return new BooleanConfParser();
+  }
+
+  public IntConfParser intConf() {
+    return new IntConfParser();
+  }
+
+  public LongConfParser longConf() {
+    return new LongConfParser();
+  }
+
+  public StringConfParser stringConf() {
+    return new StringConfParser();
+  }
+
+  class BooleanConfParser extends ConfParser<BooleanConfParser, Boolean> {
+    private Boolean defaultValue;
+
+    @Override
+    protected BooleanConfParser self() {
+      return this;
+    }
+
+    public BooleanConfParser defaultValue(boolean value) {
+      this.defaultValue = value;
+      return self();
+    }
+
+    public boolean parse() {
+      Preconditions.checkArgument(defaultValue != null, "Default value cannot be null");
+      return parse(Boolean::parseBoolean, defaultValue);
+    }
+  }
+
+  class IntConfParser extends ConfParser<IntConfParser, Integer> {
+    private Integer defaultValue;
+
+    @Override
+    protected IntConfParser self() {
+      return this;
+    }
+
+    public IntConfParser defaultValue(int value) {
+      this.defaultValue = value;
+      return self();
+    }
+
+    public int parse() {
+      Preconditions.checkArgument(defaultValue != null, "Default value cannot be null");
+      return parse(Integer::parseInt, defaultValue);
+    }
+  }
+
+  class LongConfParser extends ConfParser<LongConfParser, Long> {
+    private Long defaultValue;
+
+    @Override
+    protected LongConfParser self() {
+      return this;
+    }
+
+    public LongConfParser defaultValue(long value) {
+      this.defaultValue = value;
+      return self();
+    }
+
+    public long parse() {
+      Preconditions.checkArgument(defaultValue != null, "Default value cannot be null");
+      return parse(Long::parseLong, defaultValue);
+    }
+  }
+
+  class StringConfParser extends ConfParser<StringConfParser, String> {
+    private String defaultValue;
+
+    @Override
+    protected StringConfParser self() {
+      return this;
+    }
+
+    public StringConfParser defaultValue(String value) {
+      this.defaultValue = value;
+      return self();
+    }
+
+    public String parse() {
+      Preconditions.checkArgument(defaultValue != null, "Default value cannot be null");

Review comment:
       Maybe, I should drop this precondition for Strings.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3132: Spark: Add SparkReadConf and SparkWriteConf

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3132:
URL: https://github.com/apache/iceberg/pull/3132#discussion_r715781178



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.spark.sql.RuntimeConfig;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * A class for common Iceberg configs for Spark reads.
+ * <p>
+ * If a config is set at multiple levels, the following order of precedence is used (top to bottom):
+ * <ol>
+ *   <li>Read options</li>
+ *   <li>Session configuration</li>
+ *   <li>Table metadata</li>
+ * </ol>
+ * The most specific value is set in read options and takes precedence over all other configs.
+ * If no read option is provided, this class checks the session configuration for any overrides.
+ * If no applicable value is found in the session configuration, this classes uses the table metadata.
+ * <p>
+ * Note this class is NOT meant to be serialized and sent to executors.
+ */
+public class SparkReadConf {
+
+  private static final Set<String> LOCALITY_WHITELIST_FS = ImmutableSet.of("hdfs");
+
+  private final Table table;
+  private final RuntimeConfig sessionConf;
+  private final Map<String, String> readOptions;
+
+  public SparkReadConf(SparkSession spark, Table table, Map<String, String> readOptions) {
+    this.table = table;
+    this.sessionConf = spark.conf();
+    this.readOptions = readOptions;
+  }
+
+  public boolean localityEnabled() {
+    InputFile file = table.io().newInputFile(table.location());
+
+    if (file instanceof HadoopInputFile) {
+      String scheme = ((HadoopInputFile) file).getFileSystem().getScheme();
+      boolean defaultValue = LOCALITY_WHITELIST_FS.contains(scheme);
+      return PropertyUtil.propertyAsBoolean(
+          readOptions,
+          SparkReadOptions.LOCALITY,
+          defaultValue);
+    }
+
+    return false;
+  }
+
+  public boolean vectorizationEnabled(FileFormat fileFormat) {
+    String readOptionValue = readOptions.get(SparkReadOptions.VECTORIZATION_ENABLED);
+    if (readOptionValue != null) {
+      return Boolean.parseBoolean(readOptionValue);
+    }
+
+    String sessionConfValue = sessionConf.get(SparkSQLConfigs.VECTORIZATION_ENABLED, null);
+    if (sessionConfValue != null) {
+      return Boolean.parseBoolean(sessionConfValue);
+    }
+
+    switch (fileFormat) {
+      case PARQUET:
+        return PropertyUtil.propertyAsBoolean(
+            table.properties(),
+            TableProperties.PARQUET_VECTORIZATION_ENABLED,
+            TableProperties.PARQUET_VECTORIZATION_ENABLED_DEFAULT);
+
+      case ORC:
+        return PropertyUtil.propertyAsBoolean(
+            table.properties(),
+            TableProperties.ORC_VECTORIZATION_ENABLED,
+            TableProperties.ORC_VECTORIZATION_ENABLED_DEFAULT);
+
+      default:
+        return false;
+    }
+  }
+
+  public int batchSize(FileFormat fileFormat) {

Review comment:
       Are you proposing we move `vectorizationEnabled` and `batchSize` out of this class to the readers?
   By putting this logic here, I was hoping to reduce the duplication between Spark 2 and 3.
   
   I can introduce separate configs for ORC and Parquet if that seems cleaner.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #3132: Spark: Add SparkReadConf and SparkWriteConf

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #3132:
URL: https://github.com/apache/iceberg/pull/3132#discussion_r715174045



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.spark.sql.RuntimeConfig;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * A class for common Iceberg configs for Spark reads.
+ * <p>
+ * If a config is set at multiple levels, the following order of precedence is used (top to bottom):
+ * <ol>
+ *   <li>Read options</li>
+ *   <li>Session configuration</li>
+ *   <li>Table metadata</li>
+ * </ol>
+ * The most specific value is set in read options and takes precedence over all other configs.
+ * If no read option is provided, this class checks the session configuration for any overrides.
+ * If no applicable value is found in the session configuration, this classes uses the table metadata.
+ * <p>
+ * Note this class is NOT meant to be serialized and sent to executors.
+ */
+public class SparkReadConf {
+
+  private static final Set<String> LOCALITY_WHITELIST_FS = ImmutableSet.of("hdfs");
+
+  private final Table table;
+  private final RuntimeConfig sessionConf;
+  private final Map<String, String> readOptions;
+
+  public SparkReadConf(SparkSession spark, Table table, Map<String, String> readOptions) {
+    this.table = table;
+    this.sessionConf = spark.conf();
+    this.readOptions = readOptions;
+  }
+
+  public boolean localityEnabled() {
+    InputFile file = table.io().newInputFile(table.location());
+
+    if (file instanceof HadoopInputFile) {
+      String scheme = ((HadoopInputFile) file).getFileSystem().getScheme();
+      boolean defaultValue = LOCALITY_WHITELIST_FS.contains(scheme);
+      return PropertyUtil.propertyAsBoolean(
+          readOptions,
+          SparkReadOptions.LOCALITY,
+          defaultValue);
+    }
+
+    return false;
+  }
+
+  public boolean vectorizationEnabled(FileFormat fileFormat) {
+    String readOptionValue = readOptions.get(SparkReadOptions.VECTORIZATION_ENABLED);
+    if (readOptionValue != null) {
+      return Boolean.parseBoolean(readOptionValue);
+    }
+
+    String sessionConfValue = sessionConf.get(SparkSQLConfigs.VECTORIZATION_ENABLED, null);
+    if (sessionConfValue != null) {
+      return Boolean.parseBoolean(sessionConfValue);
+    }
+
+    switch (fileFormat) {
+      case PARQUET:
+        return PropertyUtil.propertyAsBoolean(
+            table.properties(),
+            TableProperties.PARQUET_VECTORIZATION_ENABLED,
+            TableProperties.PARQUET_VECTORIZATION_ENABLED_DEFAULT);
+
+      case ORC:
+        return PropertyUtil.propertyAsBoolean(
+            table.properties(),
+            TableProperties.ORC_VECTORIZATION_ENABLED,
+            TableProperties.ORC_VECTORIZATION_ENABLED_DEFAULT);
+
+      default:
+        return false;
+    }
+  }
+
+  public int batchSize(FileFormat fileFormat) {
+    String readOptionValue = readOptions.get(SparkReadOptions.VECTORIZATION_BATCH_SIZE);
+    if (readOptionValue != null) {
+      return Integer.parseInt(readOptionValue);
+    }
+
+    switch (fileFormat) {
+      case PARQUET:
+        return PropertyUtil.propertyAsInt(
+            table.properties(),
+            TableProperties.PARQUET_BATCH_SIZE,
+            TableProperties.PARQUET_BATCH_SIZE_DEFAULT);
+
+      case ORC:
+        return PropertyUtil.propertyAsInt(
+            table.properties(),
+            TableProperties.ORC_BATCH_SIZE,
+            TableProperties.ORC_BATCH_SIZE_DEFAULT);
+
+      default:
+        throw new IllegalArgumentException("File format does not support batch reads: " + fileFormat);
+    }
+  }
+
+  public long splitSize() {
+    String readOptionValue = readOptions.get(SparkReadOptions.SPLIT_SIZE);

Review comment:
       Seems like we use this pattern a lot, may make sense to have a 
   
   getConfiguredLong(String name, Long default)
   getConfiguredLong(SparkReadOptions.SPLIT_SIZE, TableProperties.SPLIT_SIZE_DEFAULT)
   
   OI wish our "propertyAsX" was parameterized then I would say just have
   
   getConfigured[X](String name, X default) : X 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3132: Spark: Add SparkReadConf and SparkWriteConf

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3132:
URL: https://github.com/apache/iceberg/pull/3132#discussion_r726814306



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.spark.sql.RuntimeConfig;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * A class for common Iceberg configs for Spark reads.
+ * <p>
+ * If a config is set at multiple levels, the following order of precedence is used (top to bottom):
+ * <ol>
+ *   <li>Read options</li>
+ *   <li>Session configuration</li>
+ *   <li>Table metadata</li>
+ * </ol>
+ * The most specific value is set in read options and takes precedence over all other configs.
+ * If no read option is provided, this class checks the session configuration for any overrides.
+ * If no applicable value is found in the session configuration, this classes uses the table metadata.
+ * <p>
+ * Note this class is NOT meant to be serialized and sent to executors.
+ */
+public class SparkReadConf {
+
+  private static final Set<String> LOCALITY_WHITELIST_FS = ImmutableSet.of("hdfs");
+
+  private final Table table;
+  private final RuntimeConfig sessionConf;
+  private final Map<String, String> readOptions;
+
+  public SparkReadConf(SparkSession spark, Table table, Map<String, String> readOptions) {
+    this.table = table;
+    this.sessionConf = spark.conf();
+    this.readOptions = readOptions;
+  }
+
+  public boolean localityEnabled() {
+    InputFile file = table.io().newInputFile(table.location());
+
+    if (file instanceof HadoopInputFile) {
+      String scheme = ((HadoopInputFile) file).getFileSystem().getScheme();
+      boolean defaultValue = LOCALITY_WHITELIST_FS.contains(scheme);
+      return PropertyUtil.propertyAsBoolean(
+          readOptions,
+          SparkReadOptions.LOCALITY,
+          defaultValue);
+    }
+
+    return false;
+  }
+
+  public boolean vectorizationEnabled(FileFormat fileFormat) {
+    String readOptionValue = readOptions.get(SparkReadOptions.VECTORIZATION_ENABLED);
+    if (readOptionValue != null) {
+      return Boolean.parseBoolean(readOptionValue);
+    }
+
+    String sessionConfValue = sessionConf.get(SparkSQLConfigs.VECTORIZATION_ENABLED, null);
+    if (sessionConfValue != null) {
+      return Boolean.parseBoolean(sessionConfValue);
+    }
+
+    switch (fileFormat) {
+      case PARQUET:
+        return PropertyUtil.propertyAsBoolean(
+            table.properties(),
+            TableProperties.PARQUET_VECTORIZATION_ENABLED,
+            TableProperties.PARQUET_VECTORIZATION_ENABLED_DEFAULT);
+
+      case ORC:
+        return PropertyUtil.propertyAsBoolean(
+            table.properties(),
+            TableProperties.ORC_VECTORIZATION_ENABLED,
+            TableProperties.ORC_VECTORIZATION_ENABLED_DEFAULT);
+
+      default:
+        return false;
+    }
+  }
+
+  public int batchSize(FileFormat fileFormat) {
+    String readOptionValue = readOptions.get(SparkReadOptions.VECTORIZATION_BATCH_SIZE);
+    if (readOptionValue != null) {
+      return Integer.parseInt(readOptionValue);
+    }
+
+    switch (fileFormat) {
+      case PARQUET:
+        return PropertyUtil.propertyAsInt(
+            table.properties(),
+            TableProperties.PARQUET_BATCH_SIZE,
+            TableProperties.PARQUET_BATCH_SIZE_DEFAULT);
+
+      case ORC:
+        return PropertyUtil.propertyAsInt(
+            table.properties(),
+            TableProperties.ORC_BATCH_SIZE,
+            TableProperties.ORC_BATCH_SIZE_DEFAULT);
+
+      default:
+        throw new IllegalArgumentException("File format does not support batch reads: " + fileFormat);
+    }
+  }
+
+  public long splitSize() {
+    String readOptionValue = readOptions.get(SparkReadOptions.SPLIT_SIZE);

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3132: Spark: Add SparkReadConf and SparkWriteConf

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3132:
URL: https://github.com/apache/iceberg/pull/3132#discussion_r726813757



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.spark.sql.RuntimeConfig;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * A class for common Iceberg configs for Spark reads.
+ * <p>
+ * If a config is set at multiple levels, the following order of precedence is used (top to bottom):
+ * <ol>
+ *   <li>Read options</li>
+ *   <li>Session configuration</li>
+ *   <li>Table metadata</li>
+ * </ol>
+ * The most specific value is set in read options and takes precedence over all other configs.
+ * If no read option is provided, this class checks the session configuration for any overrides.
+ * If no applicable value is found in the session configuration, this classes uses the table metadata.
+ * <p>
+ * Note this class is NOT meant to be serialized and sent to executors.
+ */
+public class SparkReadConf {
+
+  private static final Set<String> LOCALITY_WHITELIST_FS = ImmutableSet.of("hdfs");
+
+  private final Table table;
+  private final RuntimeConfig sessionConf;
+  private final Map<String, String> readOptions;
+
+  public SparkReadConf(SparkSession spark, Table table, Map<String, String> readOptions) {
+    this.table = table;
+    this.sessionConf = spark.conf();
+    this.readOptions = readOptions;
+  }
+
+  public boolean localityEnabled() {
+    InputFile file = table.io().newInputFile(table.location());
+
+    if (file instanceof HadoopInputFile) {
+      String scheme = ((HadoopInputFile) file).getFileSystem().getScheme();
+      boolean defaultValue = LOCALITY_WHITELIST_FS.contains(scheme);
+      return PropertyUtil.propertyAsBoolean(
+          readOptions,
+          SparkReadOptions.LOCALITY,
+          defaultValue);
+    }
+
+    return false;
+  }
+
+  public boolean vectorizationEnabled(FileFormat fileFormat) {
+    String readOptionValue = readOptions.get(SparkReadOptions.VECTORIZATION_ENABLED);
+    if (readOptionValue != null) {
+      return Boolean.parseBoolean(readOptionValue);
+    }
+
+    String sessionConfValue = sessionConf.get(SparkSQLConfigs.VECTORIZATION_ENABLED, null);
+    if (sessionConfValue != null) {
+      return Boolean.parseBoolean(sessionConfValue);
+    }
+
+    switch (fileFormat) {
+      case PARQUET:
+        return PropertyUtil.propertyAsBoolean(
+            table.properties(),
+            TableProperties.PARQUET_VECTORIZATION_ENABLED,
+            TableProperties.PARQUET_VECTORIZATION_ENABLED_DEFAULT);
+
+      case ORC:
+        return PropertyUtil.propertyAsBoolean(
+            table.properties(),
+            TableProperties.ORC_VECTORIZATION_ENABLED,
+            TableProperties.ORC_VECTORIZATION_ENABLED_DEFAULT);
+
+      default:
+        return false;
+    }
+  }
+
+  public int batchSize(FileFormat fileFormat) {

Review comment:
       Switched.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3132: Spark: Add SparkReadConf and SparkWriteConf

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3132:
URL: https://github.com/apache/iceberg/pull/3132#discussion_r725392250



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.util.Map;
+import java.util.function.Function;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.spark.sql.RuntimeConfig;
+import org.apache.spark.sql.SparkSession;
+
+class SparkConfParser {
+
+  private final Map<String, String> properties;
+  private final RuntimeConfig sessionConf;
+  private final Map<String, String> options;
+
+  SparkConfParser(SparkSession spark, Table table, Map<String, String> options) {
+    this.properties = table.properties();
+    this.sessionConf = spark.conf();
+    this.options = options;
+  }
+
+  public BooleanConfParser booleanConf() {
+    return new BooleanConfParser();
+  }
+
+  public IntConfParser intConf() {
+    return new IntConfParser();
+  }
+
+  public LongConfParser longConf() {
+    return new LongConfParser();
+  }
+
+  public StringConfParser stringConf() {
+    return new StringConfParser();
+  }
+
+  class BooleanConfParser extends ConfParser<BooleanConfParser, Boolean> {
+    private Boolean defaultValue;
+
+    @Override
+    protected BooleanConfParser self() {
+      return this;
+    }
+
+    public BooleanConfParser defaultValue(boolean value) {
+      this.defaultValue = value;
+      return self();
+    }
+
+    public boolean parse() {
+      Preconditions.checkArgument(defaultValue != null, "Default value cannot be null");
+      return parse(Boolean::parseBoolean, defaultValue);
+    }
+  }
+
+  class IntConfParser extends ConfParser<IntConfParser, Integer> {
+    private Integer defaultValue;
+
+    @Override
+    protected IntConfParser self() {
+      return this;
+    }
+
+    public IntConfParser defaultValue(int value) {
+      this.defaultValue = value;
+      return self();
+    }
+
+    public int parse() {
+      Preconditions.checkArgument(defaultValue != null, "Default value cannot be null");
+      return parse(Integer::parseInt, defaultValue);
+    }
+  }
+
+  class LongConfParser extends ConfParser<LongConfParser, Long> {
+    private Long defaultValue;
+
+    @Override
+    protected LongConfParser self() {
+      return this;
+    }
+
+    public LongConfParser defaultValue(long value) {
+      this.defaultValue = value;
+      return self();
+    }
+
+    public long parse() {
+      Preconditions.checkArgument(defaultValue != null, "Default value cannot be null");
+      return parse(Long::parseLong, defaultValue);
+    }
+  }
+
+  class StringConfParser extends ConfParser<StringConfParser, String> {
+    private String defaultValue;
+
+    @Override
+    protected StringConfParser self() {
+      return this;
+    }
+
+    public StringConfParser defaultValue(String value) {
+      this.defaultValue = value;
+      return self();
+    }
+
+    public String parse() {
+      Preconditions.checkArgument(defaultValue != null, "Default value cannot be null");

Review comment:
       We can always do that later.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3132: Spark: Add SparkReadConf and SparkWriteConf

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3132:
URL: https://github.com/apache/iceberg/pull/3132#discussion_r726811801



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
##########
@@ -83,19 +83,21 @@ public DataSourceReader createReader(StructType readSchema, DataSourceOptions op
         "Save mode %s is not supported", mode);
     Configuration conf = new Configuration(lazyBaseConf());
     Table table = getTableAndResolveHadoopConfiguration(options, conf);
-    boolean handleTimestampWithoutZone =
-            SparkUtil.canHandleTimestampWithoutZone(options.asMap(), lazySparkSession().conf());
-    Preconditions.checkArgument(handleTimestampWithoutZone || !SparkUtil.hasTimestampWithoutZone(table.schema()),
-            SparkUtil.TIMESTAMP_WITHOUT_TIMEZONE_ERROR);
+    SparkWriteConf writeConf = new SparkWriteConf(lazySparkSession(), table, options.asMap());

Review comment:
       Fixed in the conf parser by transforming the key.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi merged pull request #3132: Spark: Add SparkReadConf and SparkWriteConf

Posted by GitBox <gi...@apache.org>.
aokolnychyi merged pull request #3132:
URL: https://github.com/apache/iceberg/pull/3132


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3132: Spark: Add SparkReadConf and SparkWriteConf

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3132:
URL: https://github.com/apache/iceberg/pull/3132#discussion_r715781712



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.spark.sql.RuntimeConfig;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * A class for common Iceberg configs for Spark reads.
+ * <p>
+ * If a config is set at multiple levels, the following order of precedence is used (top to bottom):
+ * <ol>
+ *   <li>Read options</li>
+ *   <li>Session configuration</li>
+ *   <li>Table metadata</li>
+ * </ol>
+ * The most specific value is set in read options and takes precedence over all other configs.
+ * If no read option is provided, this class checks the session configuration for any overrides.
+ * If no applicable value is found in the session configuration, this classes uses the table metadata.
+ * <p>
+ * Note this class is NOT meant to be serialized and sent to executors.
+ */
+public class SparkReadConf {
+
+  private static final Set<String> LOCALITY_WHITELIST_FS = ImmutableSet.of("hdfs");
+
+  private final Table table;
+  private final RuntimeConfig sessionConf;
+  private final Map<String, String> readOptions;
+
+  public SparkReadConf(SparkSession spark, Table table, Map<String, String> readOptions) {
+    this.table = table;
+    this.sessionConf = spark.conf();
+    this.readOptions = readOptions;
+  }
+
+  public boolean localityEnabled() {
+    InputFile file = table.io().newInputFile(table.location());
+
+    if (file instanceof HadoopInputFile) {
+      String scheme = ((HadoopInputFile) file).getFileSystem().getScheme();
+      boolean defaultValue = LOCALITY_WHITELIST_FS.contains(scheme);
+      return PropertyUtil.propertyAsBoolean(
+          readOptions,
+          SparkReadOptions.LOCALITY,
+          defaultValue);
+    }
+
+    return false;
+  }
+
+  public boolean vectorizationEnabled(FileFormat fileFormat) {
+    String readOptionValue = readOptions.get(SparkReadOptions.VECTORIZATION_ENABLED);
+    if (readOptionValue != null) {
+      return Boolean.parseBoolean(readOptionValue);
+    }
+
+    String sessionConfValue = sessionConf.get(SparkSQLConfigs.VECTORIZATION_ENABLED, null);
+    if (sessionConfValue != null) {
+      return Boolean.parseBoolean(sessionConfValue);
+    }
+
+    switch (fileFormat) {
+      case PARQUET:
+        return PropertyUtil.propertyAsBoolean(
+            table.properties(),
+            TableProperties.PARQUET_VECTORIZATION_ENABLED,
+            TableProperties.PARQUET_VECTORIZATION_ENABLED_DEFAULT);
+
+      case ORC:
+        return PropertyUtil.propertyAsBoolean(
+            table.properties(),
+            TableProperties.ORC_VECTORIZATION_ENABLED,
+            TableProperties.ORC_VECTORIZATION_ENABLED_DEFAULT);
+
+      default:
+        return false;
+    }
+  }
+
+  public int batchSize(FileFormat fileFormat) {
+    String readOptionValue = readOptions.get(SparkReadOptions.VECTORIZATION_BATCH_SIZE);
+    if (readOptionValue != null) {
+      return Integer.parseInt(readOptionValue);
+    }
+
+    switch (fileFormat) {
+      case PARQUET:
+        return PropertyUtil.propertyAsInt(
+            table.properties(),
+            TableProperties.PARQUET_BATCH_SIZE,
+            TableProperties.PARQUET_BATCH_SIZE_DEFAULT);
+
+      case ORC:
+        return PropertyUtil.propertyAsInt(
+            table.properties(),
+            TableProperties.ORC_BATCH_SIZE,
+            TableProperties.ORC_BATCH_SIZE_DEFAULT);
+
+      default:
+        throw new IllegalArgumentException("File format does not support batch reads: " + fileFormat);
+    }
+  }
+
+  public long splitSize() {
+    String readOptionValue = readOptions.get(SparkReadOptions.SPLIT_SIZE);

Review comment:
       I will try to come up with something.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3132: Spark: Add SparkReadConf and SparkWriteConf

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3132:
URL: https://github.com/apache/iceberg/pull/3132#discussion_r725392803



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java
##########
@@ -30,6 +30,12 @@ private SparkReadOptions() {
   // Snapshot ID of the table snapshot to read
   public static final String SNAPSHOT_ID = "snapshot-id";
 
+  // Start snapshot ID used in incremental scans (exclusive)
+  public static final String START_SNAPSHOT_ID = "start-snapshot-id";
+
+  // End snapshot ID used in incremental scans (inclusive)
+  public static final String END_SNAPSHOT_ID = "end-snapshot-id";

Review comment:
       I would probably still add them to the conf parser to be consistent.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on pull request #3132: Spark: Add SparkReadConf and SparkWriteConf

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on pull request #3132:
URL: https://github.com/apache/iceberg/pull/3132#issuecomment-926243261


   Thank you for the refactoring! I have been thinking about the same thing for a long time. Regarding behavior change, I think it's a bug to resolve in another way, and we should definitely have option -> session -> table metadata if it's not the case anywhere.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #3132: Spark: Add SparkReadConf and SparkWriteConf

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #3132:
URL: https://github.com/apache/iceberg/pull/3132#discussion_r715169191



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.spark.sql.RuntimeConfig;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * A class for common Iceberg configs for Spark reads.
+ * <p>
+ * If a config is set at multiple levels, the following order of precedence is used (top to bottom):
+ * <ol>
+ *   <li>Read options</li>
+ *   <li>Session configuration</li>
+ *   <li>Table metadata</li>
+ * </ol>
+ * The most specific value is set in read options and takes precedence over all other configs.
+ * If no read option is provided, this class checks the session configuration for any overrides.
+ * If no applicable value is found in the session configuration, this classes uses the table metadata.
+ * <p>
+ * Note this class is NOT meant to be serialized and sent to executors.
+ */
+public class SparkReadConf {
+
+  private static final Set<String> LOCALITY_WHITELIST_FS = ImmutableSet.of("hdfs");
+
+  private final Table table;
+  private final RuntimeConfig sessionConf;
+  private final Map<String, String> readOptions;
+
+  public SparkReadConf(SparkSession spark, Table table, Map<String, String> readOptions) {
+    this.table = table;
+    this.sessionConf = spark.conf();
+    this.readOptions = readOptions;
+  }
+
+  public boolean localityEnabled() {
+    InputFile file = table.io().newInputFile(table.location());
+
+    if (file instanceof HadoopInputFile) {
+      String scheme = ((HadoopInputFile) file).getFileSystem().getScheme();
+      boolean defaultValue = LOCALITY_WHITELIST_FS.contains(scheme);
+      return PropertyUtil.propertyAsBoolean(
+          readOptions,
+          SparkReadOptions.LOCALITY,
+          defaultValue);
+    }
+
+    return false;
+  }
+
+  public boolean vectorizationEnabled(FileFormat fileFormat) {
+    String readOptionValue = readOptions.get(SparkReadOptions.VECTORIZATION_ENABLED);
+    if (readOptionValue != null) {
+      return Boolean.parseBoolean(readOptionValue);
+    }
+
+    String sessionConfValue = sessionConf.get(SparkSQLConfigs.VECTORIZATION_ENABLED, null);

Review comment:
       Why let this one get set at the session level and the others not? Just wondering, I thought we wanted all the properties to behave the same with
   
   Check Read, Check Session, Check TableProperties?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #3132: Spark: Add SparkReadConf and SparkWriteConf

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on pull request #3132:
URL: https://github.com/apache/iceberg/pull/3132#issuecomment-926251781


   Thanks for reviewing, @RussellSpitzer @jackye1995! I'll polish this prototype and resolve comments tomorrow.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3132: Spark: Add SparkReadConf and SparkWriteConf

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3132:
URL: https://github.com/apache/iceberg/pull/3132#discussion_r715891629



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * A class for common Iceberg configs for Spark reads.
+ * <p>
+ * If a config is set at multiple levels, the following order of precedence is used (top to bottom):
+ * <ol>
+ *   <li>Read options</li>
+ *   <li>Session configuration</li>
+ *   <li>Table metadata</li>
+ * </ol>
+ * The most specific value is set in read options and takes precedence over all other configs.
+ * If no read option is provided, this class checks the session configuration for any overrides.
+ * If no applicable value is found in the session configuration, this classes uses the table metadata.
+ * <p>
+ * Note this class is NOT meant to be serialized and sent to executors.
+ */
+public class SparkReadConf {
+
+  private static final Set<String> LOCALITY_WHITELIST_FS = ImmutableSet.of("hdfs");
+
+  private final Table table;
+  private final Map<String, String> readOptions;
+  private final SparkConfParser confParser;
+
+  public SparkReadConf(SparkSession spark, Table table, Map<String, String> readOptions) {
+    this.table = table;
+    this.readOptions = readOptions;
+    this.confParser = new SparkConfParser(spark, table, readOptions);
+  }
+
+  public boolean localityEnabled() {
+    InputFile file = table.io().newInputFile(table.location());
+
+    if (file instanceof HadoopInputFile) {
+      String scheme = ((HadoopInputFile) file).getFileSystem().getScheme();
+      boolean defaultValue = LOCALITY_WHITELIST_FS.contains(scheme);
+      return PropertyUtil.propertyAsBoolean(
+          readOptions,
+          SparkReadOptions.LOCALITY,
+          defaultValue);
+    }
+
+    return false;
+  }
+
+  public boolean parquetVectorizationEnabled() {

Review comment:
       @RussellSpitzer, let me know if I got you here correctly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3132: Spark: Add SparkReadConf and SparkWriteConf

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3132:
URL: https://github.com/apache/iceberg/pull/3132#discussion_r715891114



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.util.Map;
+import java.util.function.Function;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.spark.sql.RuntimeConfig;
+import org.apache.spark.sql.SparkSession;
+
+class SparkConfParser {
+
+  private final Map<String, String> properties;
+  private final RuntimeConfig sessionConf;
+  private final Map<String, String> options;
+
+  SparkConfParser(SparkSession spark, Table table, Map<String, String> options) {
+    this.properties = table.properties();
+    this.sessionConf = spark.conf();
+    this.options = options;
+  }
+
+  public BooleanConfParser booleanConf() {
+    return new BooleanConfParser();
+  }
+
+  public IntConfParser intConf() {
+    return new IntConfParser();
+  }
+
+  public LongConfParser longConf() {
+    return new LongConfParser();
+  }
+
+  public StringConfParser stringConf() {
+    return new StringConfParser();
+  }
+
+  class BooleanConfParser extends ConfParser<BooleanConfParser, Boolean> {
+    private Boolean defaultValue;
+
+    @Override
+    protected BooleanConfParser self() {
+      return this;
+    }
+
+    public BooleanConfParser defaultValue(boolean value) {
+      this.defaultValue = value;
+      return self();
+    }
+
+    public boolean parse() {

Review comment:
       All confs I updated are required/always have a default value.
   If there is a need for a nullable config we could add `parseOptional` or something that would return the wrapper.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #3132: Spark: Add SparkReadConf and SparkWriteConf

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #3132:
URL: https://github.com/apache/iceberg/pull/3132#discussion_r715837636



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.spark.sql.RuntimeConfig;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * A class for common Iceberg configs for Spark reads.
+ * <p>
+ * If a config is set at multiple levels, the following order of precedence is used (top to bottom):
+ * <ol>
+ *   <li>Read options</li>
+ *   <li>Session configuration</li>
+ *   <li>Table metadata</li>
+ * </ol>
+ * The most specific value is set in read options and takes precedence over all other configs.
+ * If no read option is provided, this class checks the session configuration for any overrides.
+ * If no applicable value is found in the session configuration, this classes uses the table metadata.
+ * <p>
+ * Note this class is NOT meant to be serialized and sent to executors.
+ */
+public class SparkReadConf {
+
+  private static final Set<String> LOCALITY_WHITELIST_FS = ImmutableSet.of("hdfs");
+
+  private final Table table;
+  private final RuntimeConfig sessionConf;
+  private final Map<String, String> readOptions;
+
+  public SparkReadConf(SparkSession spark, Table table, Map<String, String> readOptions) {
+    this.table = table;
+    this.sessionConf = spark.conf();
+    this.readOptions = readOptions;
+  }
+
+  public boolean localityEnabled() {
+    InputFile file = table.io().newInputFile(table.location());
+
+    if (file instanceof HadoopInputFile) {
+      String scheme = ((HadoopInputFile) file).getFileSystem().getScheme();
+      boolean defaultValue = LOCALITY_WHITELIST_FS.contains(scheme);
+      return PropertyUtil.propertyAsBoolean(
+          readOptions,
+          SparkReadOptions.LOCALITY,
+          defaultValue);
+    }
+
+    return false;
+  }
+
+  public boolean vectorizationEnabled(FileFormat fileFormat) {
+    String readOptionValue = readOptions.get(SparkReadOptions.VECTORIZATION_ENABLED);
+    if (readOptionValue != null) {
+      return Boolean.parseBoolean(readOptionValue);
+    }
+
+    String sessionConfValue = sessionConf.get(SparkSQLConfigs.VECTORIZATION_ENABLED, null);
+    if (sessionConfValue != null) {
+      return Boolean.parseBoolean(sessionConfValue);
+    }
+
+    switch (fileFormat) {
+      case PARQUET:
+        return PropertyUtil.propertyAsBoolean(
+            table.properties(),
+            TableProperties.PARQUET_VECTORIZATION_ENABLED,
+            TableProperties.PARQUET_VECTORIZATION_ENABLED_DEFAULT);
+
+      case ORC:
+        return PropertyUtil.propertyAsBoolean(
+            table.properties(),
+            TableProperties.ORC_VECTORIZATION_ENABLED,
+            TableProperties.ORC_VECTORIZATION_ENABLED_DEFAULT);
+
+      default:
+        return false;
+    }
+  }
+
+  public int batchSize(FileFormat fileFormat) {

Review comment:
       Yes, if you think that isn't a good idea we don't have to. I was just thinking that we should have a 
   batchSizeOrc
   batchSizeParquet
   
   In the conf, and then the reader would choose them in
   ```
     private int batchSize(boolean isParquetOnly, boolean isOrcOnly) {
       if (isParquetOnly) {
         return readConf.batchSizeParquet()
       } else if (isOrcOnly) {
         return readConf.batchSizeOrc()
       } else {
         return 0;
   ```
   
   Instead of the current code which passes through the file type.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #3132: Spark: Add SparkReadConf and SparkWriteConf

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #3132:
URL: https://github.com/apache/iceberg/pull/3132#discussion_r715184576



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.spark.sql.RuntimeConfig;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * A class for common Iceberg configs for Spark reads.
+ * <p>
+ * If a config is set at multiple levels, the following order of precedence is used (top to bottom):
+ * <ol>
+ *   <li>Read options</li>
+ *   <li>Session configuration</li>
+ *   <li>Table metadata</li>
+ * </ol>
+ * The most specific value is set in read options and takes precedence over all other configs.
+ * If no read option is provided, this class checks the session configuration for any overrides.
+ * If no applicable value is found in the session configuration, this classes uses the table metadata.
+ * <p>
+ * Note this class is NOT meant to be serialized and sent to executors.
+ */
+public class SparkReadConf {
+
+  private static final Set<String> LOCALITY_WHITELIST_FS = ImmutableSet.of("hdfs");
+
+  private final Table table;
+  private final RuntimeConfig sessionConf;
+  private final Map<String, String> readOptions;
+
+  public SparkReadConf(SparkSession spark, Table table, Map<String, String> readOptions) {
+    this.table = table;
+    this.sessionConf = spark.conf();
+    this.readOptions = readOptions;
+  }
+
+  public boolean localityEnabled() {
+    InputFile file = table.io().newInputFile(table.location());
+
+    if (file instanceof HadoopInputFile) {
+      String scheme = ((HadoopInputFile) file).getFileSystem().getScheme();
+      boolean defaultValue = LOCALITY_WHITELIST_FS.contains(scheme);
+      return PropertyUtil.propertyAsBoolean(
+          readOptions,
+          SparkReadOptions.LOCALITY,
+          defaultValue);
+    }
+
+    return false;
+  }
+
+  public boolean vectorizationEnabled(FileFormat fileFormat) {
+    String readOptionValue = readOptions.get(SparkReadOptions.VECTORIZATION_ENABLED);
+    if (readOptionValue != null) {
+      return Boolean.parseBoolean(readOptionValue);
+    }
+
+    String sessionConfValue = sessionConf.get(SparkSQLConfigs.VECTORIZATION_ENABLED, null);
+    if (sessionConfValue != null) {
+      return Boolean.parseBoolean(sessionConfValue);
+    }
+
+    switch (fileFormat) {
+      case PARQUET:
+        return PropertyUtil.propertyAsBoolean(
+            table.properties(),
+            TableProperties.PARQUET_VECTORIZATION_ENABLED,
+            TableProperties.PARQUET_VECTORIZATION_ENABLED_DEFAULT);
+
+      case ORC:
+        return PropertyUtil.propertyAsBoolean(
+            table.properties(),
+            TableProperties.ORC_VECTORIZATION_ENABLED,
+            TableProperties.ORC_VECTORIZATION_ENABLED_DEFAULT);
+
+      default:
+        return false;
+    }
+  }
+
+  public int batchSize(FileFormat fileFormat) {

Review comment:
       This may be personal preference here, but I'd much rather we have separate batch size getters for Parquet and ORC and put the logic about which to use in the readers themselves. We already have to do this with 
   
   https://github.com/apache/iceberg/pull/3132/files#diff-596268fde5717c19c744ede28de3c9c71d0496fcec746ce774e8a55af25afcfaR363-R369
   
   So why not just have one getter for each property and do the choosing there?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a change in pull request #3132: Spark: Add SparkReadConf and SparkWriteConf

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on a change in pull request #3132:
URL: https://github.com/apache/iceberg/pull/3132#discussion_r719022170



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkSQLConfigs.java
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+public class SparkSQLConfigs {

Review comment:
       nit: should add defaults for the configs here for clarity




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #3132: Spark: Add SparkReadConf and SparkWriteConf

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on pull request #3132:
URL: https://github.com/apache/iceberg/pull/3132#issuecomment-941163858


   Thanks for reviewing, @RussellSpitzer @jackye1995 @rdblue!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3132: Spark: Add SparkReadConf and SparkWriteConf

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3132:
URL: https://github.com/apache/iceberg/pull/3132#discussion_r726812561



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkSQLConfigs.java
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+public class SparkSQLConfigs {
+
+  private SparkSQLConfigs() {

Review comment:
       Sounds good to me. Renamed.

##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * A class for common Iceberg configs for Spark reads.
+ * <p>
+ * If a config is set at multiple levels, the following order of precedence is used (top to bottom):
+ * <ol>
+ *   <li>Read options</li>
+ *   <li>Session configuration</li>
+ *   <li>Table metadata</li>
+ * </ol>
+ * The most specific value is set in read options and takes precedence over all other configs.
+ * If no read option is provided, this class checks the session configuration for any overrides.
+ * If no applicable value is found in the session configuration, this classes uses the table metadata.

Review comment:
       Fixed.

##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.util.Locale;
+import java.util.Map;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.spark.sql.RuntimeConfig;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * A class for common Iceberg configs for Spark writes.
+ * <p>
+ * If a config is set at multiple levels, the following order of precedence is used (top to bottom):
+ * <ol>
+ *   <li>Write options</li>
+ *   <li>Session configuration</li>
+ *   <li>Table metadata</li>
+ * </ol>
+ * The most specific value is set in write options and takes precedence over all other configs.
+ * If no write option is provided, this class checks the session configuration for any overrides.
+ * If no applicable value is found in the session configuration, this classes uses the table metadata.

Review comment:
       Fixed.

##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkSQLConfigs.java
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+public class SparkSQLConfigs {

Review comment:
       Added wherever applicable.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a change in pull request #3132: Spark: Add SparkReadConf and SparkWriteConf

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on a change in pull request #3132:
URL: https://github.com/apache/iceberg/pull/3132#discussion_r715224759



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.util.Locale;
+import java.util.Map;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.spark.sql.RuntimeConfig;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * A class for common Iceberg configs for Spark writes.
+ * <p>
+ * If a config is set at multiple levels, the following order of precedence is used (top to bottom):
+ * <ol>
+ *   <li>Write options</li>
+ *   <li>Session configuration</li>
+ *   <li>Table metadata</li>
+ * </ol>
+ * The most specific value is set in write options and takes precedence over all other configs.
+ * If no write option is provided, this class checks the session configuration for any overrides.
+ * If no applicable value is found in the session configuration, this classes uses the table metadata.
+ * <p>
+ * Note this class is NOT meant to be serialized and sent to executors.
+ */
+public class SparkWriteConf {
+
+  private final Table table;
+  private final RuntimeConfig sessionConf;
+  private final Map<String, String> writeOptions;
+
+  public SparkWriteConf(SparkSession spark, Table table, Map<String, String> writeOptions) {
+    this.table = table;
+    this.sessionConf = spark.conf();
+    this.writeOptions = writeOptions;
+  }
+
+  public boolean checkNullability() {
+    String writeOptionValue = writeOptions.get(SparkWriteOptions.CHECK_NULLABILITY);
+    if (writeOptionValue != null) {
+      return Boolean.parseBoolean(writeOptionValue);
+    }
+
+    String sessionConfValue = sessionConf.get(SparkSQLConfigs.CHECK_NULLABILITY, null);
+    if (sessionConfValue != null) {
+      return Boolean.parseBoolean(sessionConfValue);
+    }
+
+    return true;
+  }
+
+  public boolean checkOrdering() {
+    String writeOptionValue = writeOptions.get(SparkWriteOptions.CHECK_ORDERING);
+    if (writeOptionValue != null) {
+      return Boolean.parseBoolean(writeOptionValue);
+    }
+
+    String sessionConfValue = sessionConf.get(SparkSQLConfigs.CHECK_ORDERING, null);
+    if (sessionConfValue != null) {
+      return Boolean.parseBoolean(sessionConfValue);
+    }
+
+    return true;
+  }
+
+  /**
+   * Enables writing a timestamp with time zone as a timestamp without time zone.
+   * <p>
+   * Generally, this is not safe as a timestamp without time zone is supposed to represent the wall-clock time,
+   * i.e. no matter the reader/writer timezone 3PM should always be read as 3PM,
+   * but a timestamp with time zone represents instant semantics, i.e. the timestamp
+   * is adjusted so that the corresponding time in the reader timezone is displayed.
+   * <p>
+   * When set to false (default), an exception must be thrown if the table contains a timestamp without time zone.
+   *
+   * @return boolean indicating if writing timestamps without timezone is allowed
+   */
+  public boolean handleTimestampWithoutZone() {
+    String writeOptionValue = writeOptions.get(SparkWriteOptions.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE);
+    if (writeOptionValue != null) {
+      return Boolean.parseBoolean(writeOptionValue);
+    }
+
+    String sessionConfValue = sessionConf.get(SparkSQLConfigs.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE, null);
+    if (sessionConfValue != null) {
+      return Boolean.parseBoolean(sessionConfValue);
+    }
+
+    return false;
+  }
+
+  public String overwriteMode() {
+    String overwriteMode = writeOptions.get("overwrite-mode");
+    return overwriteMode != null ? overwriteMode.toLowerCase(Locale.ROOT) : null;
+  }
+
+  public String wapId() {
+    return sessionConf.get("spark.wap.id", null);
+  }
+
+  public FileFormat dataFileFormat() {
+    String writeOptionValue = writeOptions.get(SparkWriteOptions.WRITE_FORMAT);

Review comment:
       But that will require us to read the default before reading the options value. 
   
   I am thinking maybe `Optional<String> getOption(String optionName)`, and then we can chain it with table property default, just a thought.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3132: Spark: Add SparkReadConf and SparkWriteConf

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3132:
URL: https://github.com/apache/iceberg/pull/3132#discussion_r715772435



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.spark.sql.RuntimeConfig;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * A class for common Iceberg configs for Spark reads.
+ * <p>
+ * If a config is set at multiple levels, the following order of precedence is used (top to bottom):
+ * <ol>
+ *   <li>Read options</li>
+ *   <li>Session configuration</li>
+ *   <li>Table metadata</li>
+ * </ol>
+ * The most specific value is set in read options and takes precedence over all other configs.
+ * If no read option is provided, this class checks the session configuration for any overrides.
+ * If no applicable value is found in the session configuration, this classes uses the table metadata.
+ * <p>
+ * Note this class is NOT meant to be serialized and sent to executors.
+ */
+public class SparkReadConf {
+
+  private static final Set<String> LOCALITY_WHITELIST_FS = ImmutableSet.of("hdfs");
+
+  private final Table table;
+  private final RuntimeConfig sessionConf;
+  private final Map<String, String> readOptions;
+
+  public SparkReadConf(SparkSession spark, Table table, Map<String, String> readOptions) {
+    this.table = table;
+    this.sessionConf = spark.conf();
+    this.readOptions = readOptions;
+  }
+
+  public boolean localityEnabled() {
+    InputFile file = table.io().newInputFile(table.location());
+
+    if (file instanceof HadoopInputFile) {
+      String scheme = ((HadoopInputFile) file).getFileSystem().getScheme();
+      boolean defaultValue = LOCALITY_WHITELIST_FS.contains(scheme);
+      return PropertyUtil.propertyAsBoolean(
+          readOptions,
+          SparkReadOptions.LOCALITY,
+          defaultValue);
+    }
+
+    return false;
+  }
+
+  public boolean vectorizationEnabled(FileFormat fileFormat) {
+    String readOptionValue = readOptions.get(SparkReadOptions.VECTORIZATION_ENABLED);
+    if (readOptionValue != null) {
+      return Boolean.parseBoolean(readOptionValue);
+    }
+
+    String sessionConfValue = sessionConf.get(SparkSQLConfigs.VECTORIZATION_ENABLED, null);

Review comment:
       I kept this as a refactoring for now and restructured existing configs. I agree it would be cleaner to add more SQL confs wherever appropriate.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi merged pull request #3132: Spark: Add SparkReadConf and SparkWriteConf

Posted by GitBox <gi...@apache.org>.
aokolnychyi merged pull request #3132:
URL: https://github.com/apache/iceberg/pull/3132


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3132: Spark: Add SparkReadConf and SparkWriteConf

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3132:
URL: https://github.com/apache/iceberg/pull/3132#discussion_r726813461



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.util.Map;
+import java.util.function.Function;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.spark.sql.RuntimeConfig;
+import org.apache.spark.sql.SparkSession;
+
+class SparkConfParser {
+
+  private final Map<String, String> properties;
+  private final RuntimeConfig sessionConf;
+  private final Map<String, String> options;
+
+  SparkConfParser(SparkSession spark, Table table, Map<String, String> options) {
+    this.properties = table.properties();
+    this.sessionConf = spark.conf();
+    this.options = options;
+  }
+
+  public BooleanConfParser booleanConf() {
+    return new BooleanConfParser();
+  }
+
+  public IntConfParser intConf() {
+    return new IntConfParser();
+  }
+
+  public LongConfParser longConf() {
+    return new LongConfParser();
+  }
+
+  public StringConfParser stringConf() {
+    return new StringConfParser();
+  }
+
+  class BooleanConfParser extends ConfParser<BooleanConfParser, Boolean> {
+    private Boolean defaultValue;
+
+    @Override
+    protected BooleanConfParser self() {
+      return this;
+    }
+
+    public BooleanConfParser defaultValue(boolean value) {
+      this.defaultValue = value;
+      return self();
+    }
+
+    public boolean parse() {
+      Preconditions.checkArgument(defaultValue != null, "Default value cannot be null");
+      return parse(Boolean::parseBoolean, defaultValue);
+    }
+  }
+
+  class IntConfParser extends ConfParser<IntConfParser, Integer> {
+    private Integer defaultValue;
+
+    @Override
+    protected IntConfParser self() {
+      return this;
+    }
+
+    public IntConfParser defaultValue(int value) {
+      this.defaultValue = value;
+      return self();
+    }
+
+    public int parse() {
+      Preconditions.checkArgument(defaultValue != null, "Default value cannot be null");
+      return parse(Integer::parseInt, defaultValue);
+    }
+  }
+
+  class LongConfParser extends ConfParser<LongConfParser, Long> {
+    private Long defaultValue;
+
+    @Override
+    protected LongConfParser self() {
+      return this;
+    }
+
+    public LongConfParser defaultValue(long value) {
+      this.defaultValue = value;
+      return self();
+    }
+
+    public long parse() {
+      Preconditions.checkArgument(defaultValue != null, "Default value cannot be null");
+      return parse(Long::parseLong, defaultValue);
+    }
+  }
+
+  class StringConfParser extends ConfParser<StringConfParser, String> {
+    private String defaultValue;
+
+    @Override
+    protected StringConfParser self() {
+      return this;
+    }
+
+    public StringConfParser defaultValue(String value) {
+      this.defaultValue = value;
+      return self();
+    }
+
+    public String parse() {
+      Preconditions.checkArgument(defaultValue != null, "Default value cannot be null");

Review comment:
       I've decided to keep it to match the behavior of `parse` for other types.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3132: Spark: Add SparkReadConf and SparkWriteConf

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3132:
URL: https://github.com/apache/iceberg/pull/3132#discussion_r725396350



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkMergeScan.java
##########
@@ -68,29 +59,23 @@
   private List<CombinedScanTask> tasks = null; // lazy cache of tasks
   private Set<String> filteredLocations = null;
 
-  SparkMergeScan(SparkSession spark, Table table, boolean caseSensitive, boolean ignoreResiduals,
+  SparkMergeScan(SparkSession spark, Table table, SparkReadConf readConf,
+                 boolean caseSensitive, boolean ignoreResiduals,
                  Schema expectedSchema, List<Expression> filters, CaseInsensitiveStringMap options) {
 
-    super(spark, table, caseSensitive, expectedSchema, filters, options);
+    super(spark, table, readConf, caseSensitive, expectedSchema, filters, options);
 
     this.table = table;
     this.ignoreResiduals = ignoreResiduals;
     this.expectedSchema = expectedSchema;
+    this.splitSize = readConf.splitSize();
+    this.splitLookback = readConf.splitLookback();
+    this.splitOpenFileCost = readConf.splitOpenFileCost();
 
-    Map<String, String> props = table.properties();
-
-    long tableSplitSize = PropertyUtil.propertyAsLong(props, SPLIT_SIZE, SPLIT_SIZE_DEFAULT);
-    this.splitSize = Spark3Util.propertyAsLong(options, SparkReadOptions.SPLIT_SIZE, tableSplitSize);
-
-    int tableSplitLookback = PropertyUtil.propertyAsInt(props, SPLIT_LOOKBACK, SPLIT_LOOKBACK_DEFAULT);
-    this.splitLookback = Spark3Util.propertyAsInt(options, SparkReadOptions.LOOKBACK, tableSplitLookback);
-
-    long tableOpenFileCost = PropertyUtil.propertyAsLong(props, SPLIT_OPEN_FILE_COST, SPLIT_OPEN_FILE_COST_DEFAULT);
-    this.splitOpenFileCost = Spark3Util.propertyAsLong(options, SparkReadOptions.FILE_OPEN_COST, tableOpenFileCost);
-
-    Preconditions.checkArgument(!options.containsKey("snapshot-id"), "Cannot have snapshot-id in options");
+    Preconditions.checkArgument(!options.containsKey(SparkReadOptions.SNAPSHOT_ID), "Can't set snapshot-id in options");
     Snapshot currentSnapshot = table.currentSnapshot();
     this.snapshotId = currentSnapshot != null ? currentSnapshot.snapshotId() : null;
+

Review comment:
       Nit: unnecessary whitespace change.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3132: Spark: Add SparkReadConf and SparkWriteConf

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3132:
URL: https://github.com/apache/iceberg/pull/3132#discussion_r725395847



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
##########
@@ -110,16 +110,16 @@
   Reader(SparkSession spark, Table table, boolean caseSensitive, DataSourceOptions options) {
     this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
     this.table = table;
-    this.options = options;
+    this.readConf = new SparkReadConf(spark, table, options.asMap());
     this.snapshotId = options.get(SparkReadOptions.SNAPSHOT_ID).map(Long::parseLong).orElse(null);
     this.asOfTimestamp = options.get(SparkReadOptions.AS_OF_TIMESTAMP).map(Long::parseLong).orElse(null);
     if (snapshotId != null && asOfTimestamp != null) {
       throw new IllegalArgumentException(
           "Cannot scan using both snapshot-id and as-of-timestamp to select the table snapshot");
     }
 
-    this.startSnapshotId = options.get("start-snapshot-id").map(Long::parseLong).orElse(null);
-    this.endSnapshotId = options.get("end-snapshot-id").map(Long::parseLong).orElse(null);
+    this.startSnapshotId = options.get(SparkReadOptions.START_SNAPSHOT_ID).map(Long::parseLong).orElse(null);
+    this.endSnapshotId = options.get(SparkReadOptions.END_SNAPSHOT_ID).map(Long::parseLong).orElse(null);

Review comment:
       I do think it would be cleaner with these in the read conf.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3132: Spark: Add SparkReadConf and SparkWriteConf

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3132:
URL: https://github.com/apache/iceberg/pull/3132#discussion_r726813910



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.util.Locale;
+import java.util.Map;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.spark.sql.RuntimeConfig;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * A class for common Iceberg configs for Spark writes.
+ * <p>
+ * If a config is set at multiple levels, the following order of precedence is used (top to bottom):
+ * <ol>
+ *   <li>Write options</li>
+ *   <li>Session configuration</li>
+ *   <li>Table metadata</li>
+ * </ol>
+ * The most specific value is set in write options and takes precedence over all other configs.
+ * If no write option is provided, this class checks the session configuration for any overrides.
+ * If no applicable value is found in the session configuration, this classes uses the table metadata.
+ * <p>
+ * Note this class is NOT meant to be serialized and sent to executors.
+ */
+public class SparkWriteConf {
+
+  private final Table table;
+  private final RuntimeConfig sessionConf;
+  private final Map<String, String> writeOptions;
+
+  public SparkWriteConf(SparkSession spark, Table table, Map<String, String> writeOptions) {
+    this.table = table;
+    this.sessionConf = spark.conf();
+    this.writeOptions = writeOptions;
+  }
+
+  public boolean checkNullability() {
+    String writeOptionValue = writeOptions.get(SparkWriteOptions.CHECK_NULLABILITY);
+    if (writeOptionValue != null) {
+      return Boolean.parseBoolean(writeOptionValue);
+    }
+
+    String sessionConfValue = sessionConf.get(SparkSQLConfigs.CHECK_NULLABILITY, null);
+    if (sessionConfValue != null) {
+      return Boolean.parseBoolean(sessionConfValue);
+    }
+
+    return true;
+  }
+
+  public boolean checkOrdering() {
+    String writeOptionValue = writeOptions.get(SparkWriteOptions.CHECK_ORDERING);
+    if (writeOptionValue != null) {
+      return Boolean.parseBoolean(writeOptionValue);
+    }
+
+    String sessionConfValue = sessionConf.get(SparkSQLConfigs.CHECK_ORDERING, null);
+    if (sessionConfValue != null) {
+      return Boolean.parseBoolean(sessionConfValue);
+    }
+
+    return true;
+  }
+
+  /**
+   * Enables writing a timestamp with time zone as a timestamp without time zone.
+   * <p>
+   * Generally, this is not safe as a timestamp without time zone is supposed to represent the wall-clock time,
+   * i.e. no matter the reader/writer timezone 3PM should always be read as 3PM,
+   * but a timestamp with time zone represents instant semantics, i.e. the timestamp
+   * is adjusted so that the corresponding time in the reader timezone is displayed.
+   * <p>
+   * When set to false (default), an exception must be thrown if the table contains a timestamp without time zone.
+   *
+   * @return boolean indicating if writing timestamps without timezone is allowed
+   */
+  public boolean handleTimestampWithoutZone() {
+    String writeOptionValue = writeOptions.get(SparkWriteOptions.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE);
+    if (writeOptionValue != null) {
+      return Boolean.parseBoolean(writeOptionValue);
+    }
+
+    String sessionConfValue = sessionConf.get(SparkSQLConfigs.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE, null);
+    if (sessionConfValue != null) {
+      return Boolean.parseBoolean(sessionConfValue);
+    }
+
+    return false;
+  }
+
+  public String overwriteMode() {
+    String overwriteMode = writeOptions.get("overwrite-mode");
+    return overwriteMode != null ? overwriteMode.toLowerCase(Locale.ROOT) : null;
+  }
+
+  public String wapId() {
+    return sessionConf.get("spark.wap.id", null);
+  }
+
+  public FileFormat dataFileFormat() {
+    String writeOptionValue = writeOptions.get(SparkWriteOptions.WRITE_FORMAT);

Review comment:
       This has been fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3132: Spark: Add SparkReadConf and SparkWriteConf

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3132:
URL: https://github.com/apache/iceberg/pull/3132#discussion_r726811532



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
##########
@@ -110,16 +110,16 @@
   Reader(SparkSession spark, Table table, boolean caseSensitive, DataSourceOptions options) {
     this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
     this.table = table;
-    this.options = options;
+    this.readConf = new SparkReadConf(spark, table, options.asMap());
     this.snapshotId = options.get(SparkReadOptions.SNAPSHOT_ID).map(Long::parseLong).orElse(null);
     this.asOfTimestamp = options.get(SparkReadOptions.AS_OF_TIMESTAMP).map(Long::parseLong).orElse(null);
     if (snapshotId != null && asOfTimestamp != null) {
       throw new IllegalArgumentException(
           "Cannot scan using both snapshot-id and as-of-timestamp to select the table snapshot");
     }
 
-    this.startSnapshotId = options.get("start-snapshot-id").map(Long::parseLong).orElse(null);
-    this.endSnapshotId = options.get("end-snapshot-id").map(Long::parseLong).orElse(null);
+    this.startSnapshotId = options.get(SparkReadOptions.START_SNAPSHOT_ID).map(Long::parseLong).orElse(null);
+    this.endSnapshotId = options.get(SparkReadOptions.END_SNAPSHOT_ID).map(Long::parseLong).orElse(null);

Review comment:
       Moved to the read conf.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #3132: Spark: Add SparkReadConf and SparkWriteConf

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on pull request #3132:
URL: https://github.com/apache/iceberg/pull/3132#issuecomment-941163858


   Thanks for reviewing, @RussellSpitzer @jackye1995 @rdblue!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a change in pull request #3132: Spark: Add SparkReadConf and SparkWriteConf

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on a change in pull request #3132:
URL: https://github.com/apache/iceberg/pull/3132#discussion_r719023384



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * A class for common Iceberg configs for Spark reads.
+ * <p>
+ * If a config is set at multiple levels, the following order of precedence is used (top to bottom):
+ * <ol>
+ *   <li>Read options</li>
+ *   <li>Session configuration</li>
+ *   <li>Table metadata</li>
+ * </ol>
+ * The most specific value is set in read options and takes precedence over all other configs.
+ * If no read option is provided, this class checks the session configuration for any overrides.
+ * If no applicable value is found in the session configuration, this classes uses the table metadata.

Review comment:
       nit: this class




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a change in pull request #3132: Spark: Add SparkReadConf and SparkWriteConf

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on a change in pull request #3132:
URL: https://github.com/apache/iceberg/pull/3132#discussion_r715226008



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java
##########
@@ -30,6 +30,12 @@ private SparkReadOptions() {
   // Snapshot ID of the table snapshot to read
   public static final String SNAPSHOT_ID = "snapshot-id";
 
+  // Start snapshot ID used in incremental scans (exclusive)
+  public static final String START_SNAPSHOT_ID = "start-snapshot-id";
+
+  // End snapshot ID used in incremental scans (inclusive)
+  public static final String END_SNAPSHOT_ID = "end-snapshot-id";

Review comment:
       these 2 are missing corresponding methods in the `SparkReadConf`, is that intentional?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3132: Spark: Add SparkReadConf and SparkWriteConf

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3132:
URL: https://github.com/apache/iceberg/pull/3132#discussion_r726811249



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkMergeScan.java
##########
@@ -68,29 +59,23 @@
   private List<CombinedScanTask> tasks = null; // lazy cache of tasks
   private Set<String> filteredLocations = null;
 
-  SparkMergeScan(SparkSession spark, Table table, boolean caseSensitive, boolean ignoreResiduals,
+  SparkMergeScan(SparkSession spark, Table table, SparkReadConf readConf,
+                 boolean caseSensitive, boolean ignoreResiduals,
                  Schema expectedSchema, List<Expression> filters, CaseInsensitiveStringMap options) {
 
-    super(spark, table, caseSensitive, expectedSchema, filters, options);
+    super(spark, table, readConf, caseSensitive, expectedSchema, filters, options);
 
     this.table = table;
     this.ignoreResiduals = ignoreResiduals;
     this.expectedSchema = expectedSchema;
+    this.splitSize = readConf.splitSize();
+    this.splitLookback = readConf.splitLookback();
+    this.splitOpenFileCost = readConf.splitOpenFileCost();
 
-    Map<String, String> props = table.properties();
-
-    long tableSplitSize = PropertyUtil.propertyAsLong(props, SPLIT_SIZE, SPLIT_SIZE_DEFAULT);
-    this.splitSize = Spark3Util.propertyAsLong(options, SparkReadOptions.SPLIT_SIZE, tableSplitSize);
-
-    int tableSplitLookback = PropertyUtil.propertyAsInt(props, SPLIT_LOOKBACK, SPLIT_LOOKBACK_DEFAULT);
-    this.splitLookback = Spark3Util.propertyAsInt(options, SparkReadOptions.LOOKBACK, tableSplitLookback);
-
-    long tableOpenFileCost = PropertyUtil.propertyAsLong(props, SPLIT_OPEN_FILE_COST, SPLIT_OPEN_FILE_COST_DEFAULT);
-    this.splitOpenFileCost = Spark3Util.propertyAsLong(options, SparkReadOptions.FILE_OPEN_COST, tableOpenFileCost);
-
-    Preconditions.checkArgument(!options.containsKey("snapshot-id"), "Cannot have snapshot-id in options");
+    Preconditions.checkArgument(!options.containsKey(SparkReadOptions.SNAPSHOT_ID), "Can't set snapshot-id in options");
     Snapshot currentSnapshot = table.currentSnapshot();
     this.snapshotId = currentSnapshot != null ? currentSnapshot.snapshotId() : null;
+

Review comment:
       This is actually intentional to separate this block that sets and validates the snapshot ID.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #3132: Spark: Add SparkReadConf and SparkWriteConf

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on pull request #3132:
URL: https://github.com/apache/iceberg/pull/3132#issuecomment-921111309


   cc @flyrain @karuppayya @RussellSpitzer @kbendick @jackye1995 @rdblue @szehon-ho @shardulm94 @samarthjain


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3132: Spark: Add SparkReadConf and SparkWriteConf

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3132:
URL: https://github.com/apache/iceberg/pull/3132#discussion_r710338319



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -132,6 +132,9 @@ private TableProperties() {
   public static final String ORC_VECTORIZATION_ENABLED = "read.orc.vectorization.enabled";
   public static final boolean ORC_VECTORIZATION_ENABLED_DEFAULT = false;
 
+  public static final String ORC_BATCH_SIZE = "read.orc.vectorization.batch-size";

Review comment:
       I'll submit this separately.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3132: Spark: Add SparkReadConf and SparkWriteConf

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3132:
URL: https://github.com/apache/iceberg/pull/3132#discussion_r710350300



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.util.Locale;
+import java.util.Map;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.spark.sql.RuntimeConfig;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * A class for common Iceberg configs for Spark writes.
+ * <p>
+ * If a config is set at multiple levels, the following order of precedence is used (top to bottom):
+ * <ol>
+ *   <li>Write options</li>
+ *   <li>Session configuration</li>
+ *   <li>Table metadata</li>
+ * </ol>
+ * The most specific value is set in write options and takes precedence over all other configs.
+ * If no write option is provided, this class checks the session configuration for any overrides.
+ * If no applicable value is found in the session configuration, this classes uses the table metadata.
+ * <p>
+ * Note this class is NOT meant to be serialized and sent to executors.
+ */
+public class SparkWriteConf {
+
+  private final Table table;
+  private final RuntimeConfig sessionConf;
+  private final Map<String, String> writeOptions;
+
+  public SparkWriteConf(SparkSession spark, Table table, Map<String, String> writeOptions) {
+    this.table = table;
+    this.sessionConf = spark.conf();
+    this.writeOptions = writeOptions;
+  }
+
+  public boolean checkNullability() {

Review comment:
       This is a breaking change I mention on the dev list.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3132: Spark: Add SparkReadConf and SparkWriteConf

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3132:
URL: https://github.com/apache/iceberg/pull/3132#discussion_r725394048



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkSQLConfigs.java
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+public class SparkSQLConfigs {
+
+  private SparkSQLConfigs() {

Review comment:
       Similar classes are named:
   * `SystemProperties` for `java.lang.System.getProperties()`
   * `TableProperties` for `Table.properties()`
   * `ConfigProperties` for Hadoop `Configuration`
   * `SparkReadOptions` and `SparkWriteOptions` for Spark's `DataSourceOptions` or `CaseInsensitiveStringMap`
   
   It seems a bit out of place to name this `Configs`. What about `SparkSQLProperties` instead? Spark refers to them as "properties" in places: https://spark.apache.org/docs/latest/configuration.html#spark-properties




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3132: Spark: Add SparkReadConf and SparkWriteConf

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3132:
URL: https://github.com/apache/iceberg/pull/3132#discussion_r715841776



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.spark.sql.RuntimeConfig;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * A class for common Iceberg configs for Spark reads.
+ * <p>
+ * If a config is set at multiple levels, the following order of precedence is used (top to bottom):
+ * <ol>
+ *   <li>Read options</li>
+ *   <li>Session configuration</li>
+ *   <li>Table metadata</li>
+ * </ol>
+ * The most specific value is set in read options and takes precedence over all other configs.
+ * If no read option is provided, this class checks the session configuration for any overrides.
+ * If no applicable value is found in the session configuration, this classes uses the table metadata.
+ * <p>
+ * Note this class is NOT meant to be serialized and sent to executors.
+ */
+public class SparkReadConf {
+
+  private static final Set<String> LOCALITY_WHITELIST_FS = ImmutableSet.of("hdfs");
+
+  private final Table table;
+  private final RuntimeConfig sessionConf;
+  private final Map<String, String> readOptions;
+
+  public SparkReadConf(SparkSession spark, Table table, Map<String, String> readOptions) {
+    this.table = table;
+    this.sessionConf = spark.conf();
+    this.readOptions = readOptions;
+  }
+
+  public boolean localityEnabled() {
+    InputFile file = table.io().newInputFile(table.location());
+
+    if (file instanceof HadoopInputFile) {
+      String scheme = ((HadoopInputFile) file).getFileSystem().getScheme();
+      boolean defaultValue = LOCALITY_WHITELIST_FS.contains(scheme);
+      return PropertyUtil.propertyAsBoolean(
+          readOptions,
+          SparkReadOptions.LOCALITY,
+          defaultValue);
+    }
+
+    return false;
+  }
+
+  public boolean vectorizationEnabled(FileFormat fileFormat) {
+    String readOptionValue = readOptions.get(SparkReadOptions.VECTORIZATION_ENABLED);
+    if (readOptionValue != null) {
+      return Boolean.parseBoolean(readOptionValue);
+    }
+
+    String sessionConfValue = sessionConf.get(SparkSQLConfigs.VECTORIZATION_ENABLED, null);
+    if (sessionConfValue != null) {
+      return Boolean.parseBoolean(sessionConfValue);
+    }
+
+    switch (fileFormat) {
+      case PARQUET:
+        return PropertyUtil.propertyAsBoolean(
+            table.properties(),
+            TableProperties.PARQUET_VECTORIZATION_ENABLED,
+            TableProperties.PARQUET_VECTORIZATION_ENABLED_DEFAULT);
+
+      case ORC:
+        return PropertyUtil.propertyAsBoolean(
+            table.properties(),
+            TableProperties.ORC_VECTORIZATION_ENABLED,
+            TableProperties.ORC_VECTORIZATION_ENABLED_DEFAULT);
+
+      default:
+        return false;
+    }
+  }
+
+  public int batchSize(FileFormat fileFormat) {

Review comment:
       Sounds good, I'll switch to that. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3132: Spark: Add SparkReadConf and SparkWriteConf

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3132:
URL: https://github.com/apache/iceberg/pull/3132#discussion_r715892998



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.util.Map;
+import java.util.function.Function;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.spark.sql.RuntimeConfig;
+import org.apache.spark.sql.SparkSession;
+
+class SparkConfParser {

Review comment:
       I'll add a test if this is the way we want to go.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3132: Spark: Add SparkReadConf and SparkWriteConf

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3132:
URL: https://github.com/apache/iceberg/pull/3132#discussion_r715783102



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java
##########
@@ -30,6 +30,12 @@ private SparkReadOptions() {
   // Snapshot ID of the table snapshot to read
   public static final String SNAPSHOT_ID = "snapshot-id";
 
+  // Start snapshot ID used in incremental scans (exclusive)
+  public static final String START_SNAPSHOT_ID = "start-snapshot-id";
+
+  // End snapshot ID used in incremental scans (inclusive)
+  public static final String END_SNAPSHOT_ID = "end-snapshot-id";

Review comment:
       These 2 are purely read configs. There is no table or session config.
   For now, I just replaced hard-coded values with constants.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a change in pull request #3132: Spark: Add SparkReadConf and SparkWriteConf

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on a change in pull request #3132:
URL: https://github.com/apache/iceberg/pull/3132#discussion_r719023063



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.util.Locale;
+import java.util.Map;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.spark.sql.RuntimeConfig;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * A class for common Iceberg configs for Spark writes.
+ * <p>
+ * If a config is set at multiple levels, the following order of precedence is used (top to bottom):
+ * <ol>
+ *   <li>Write options</li>
+ *   <li>Session configuration</li>
+ *   <li>Table metadata</li>
+ * </ol>
+ * The most specific value is set in write options and takes precedence over all other configs.
+ * If no write option is provided, this class checks the session configuration for any overrides.
+ * If no applicable value is found in the session configuration, this classes uses the table metadata.

Review comment:
       nit: this class




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #3132: Spark: Add SparkReadConf and SparkWriteConf

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #3132:
URL: https://github.com/apache/iceberg/pull/3132#discussion_r715185071



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.spark.sql.RuntimeConfig;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * A class for common Iceberg configs for Spark reads.
+ * <p>
+ * If a config is set at multiple levels, the following order of precedence is used (top to bottom):
+ * <ol>
+ *   <li>Read options</li>
+ *   <li>Session configuration</li>
+ *   <li>Table metadata</li>
+ * </ol>
+ * The most specific value is set in read options and takes precedence over all other configs.
+ * If no read option is provided, this class checks the session configuration for any overrides.
+ * If no applicable value is found in the session configuration, this classes uses the table metadata.
+ * <p>
+ * Note this class is NOT meant to be serialized and sent to executors.
+ */
+public class SparkReadConf {
+
+  private static final Set<String> LOCALITY_WHITELIST_FS = ImmutableSet.of("hdfs");
+
+  private final Table table;
+  private final RuntimeConfig sessionConf;
+  private final Map<String, String> readOptions;
+
+  public SparkReadConf(SparkSession spark, Table table, Map<String, String> readOptions) {
+    this.table = table;
+    this.sessionConf = spark.conf();
+    this.readOptions = readOptions;
+  }
+
+  public boolean localityEnabled() {
+    InputFile file = table.io().newInputFile(table.location());
+
+    if (file instanceof HadoopInputFile) {
+      String scheme = ((HadoopInputFile) file).getFileSystem().getScheme();
+      boolean defaultValue = LOCALITY_WHITELIST_FS.contains(scheme);
+      return PropertyUtil.propertyAsBoolean(
+          readOptions,
+          SparkReadOptions.LOCALITY,
+          defaultValue);
+    }
+
+    return false;
+  }
+
+  public boolean vectorizationEnabled(FileFormat fileFormat) {
+    String readOptionValue = readOptions.get(SparkReadOptions.VECTORIZATION_ENABLED);
+    if (readOptionValue != null) {
+      return Boolean.parseBoolean(readOptionValue);
+    }
+
+    String sessionConfValue = sessionConf.get(SparkSQLConfigs.VECTORIZATION_ENABLED, null);
+    if (sessionConfValue != null) {
+      return Boolean.parseBoolean(sessionConfValue);
+    }
+
+    switch (fileFormat) {
+      case PARQUET:
+        return PropertyUtil.propertyAsBoolean(
+            table.properties(),
+            TableProperties.PARQUET_VECTORIZATION_ENABLED,
+            TableProperties.PARQUET_VECTORIZATION_ENABLED_DEFAULT);
+
+      case ORC:
+        return PropertyUtil.propertyAsBoolean(
+            table.properties(),
+            TableProperties.ORC_VECTORIZATION_ENABLED,
+            TableProperties.ORC_VECTORIZATION_ENABLED_DEFAULT);
+
+      default:
+        return false;
+    }
+  }
+
+  public int batchSize(FileFormat fileFormat) {

Review comment:
       This keeps the logic of "Do you support batch reads" in the batch reader rather than in the global configuration class




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org