You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2018/12/14 18:04:22 UTC

[incubator-iceberg] branch master updated: Allow custom hadoop properties to be loaded in the Spark data source (#7)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new ec31b27  Allow custom hadoop properties to be loaded in the Spark data source (#7)
ec31b27 is described below

commit ec31b27c5296ad73765939763ca46bb2d43ae140
Author: mccheah <mc...@palantir.com>
AuthorDate: Fri Dec 14 10:04:17 2018 -0800

    Allow custom hadoop properties to be loaded in the Spark data source (#7)
    
    Properties that start with iceberg.hadoop are copied into the Hadoop Configuration used in the Spark source. These may be set in table properties or in read and write options passed to the Spark operation. Read and write options take precedence over the table properties.
    
    Supporting these custom Hadoop properties should also be done in other Iceberg integrations in subsequent patches.
---
 .../iceberg/spark/source/IcebergSource.java        | 40 +++++++++++++++++-----
 .../iceberg/spark/source/TestIcebergSource.java    |  3 +-
 2 files changed, 33 insertions(+), 10 deletions(-)

diff --git a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java
index 7daa330..cd1a0af 100644
--- a/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java
+++ b/spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java
@@ -39,6 +39,7 @@ import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
 import org.apache.spark.sql.types.StructType;
 import java.util.List;
 import java.util.Locale;
+import java.util.Map;
 import java.util.Optional;
 
 import static com.netflix.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
@@ -56,16 +57,18 @@ public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, D
 
   @Override
   public DataSourceReader createReader(DataSourceOptions options) {
-    Table table = findTable(options);
-    return new Reader(table, lazyConf());
+    Configuration conf = new Configuration(lazyBaseConf());
+    Table table = getTableAndResolveHadoopConfiguration(options, conf);
+
+    return new Reader(table, conf);
   }
 
   @Override
   public Optional<DataSourceWriter> createWriter(String jobId, StructType dfStruct, SaveMode mode,
                                                    DataSourceOptions options) {
     Preconditions.checkArgument(mode == SaveMode.Append, "Save mode %s is not supported", mode);
-
-    Table table = findTable(options);
+    Configuration conf = new Configuration(lazyBaseConf());
+    Table table = getTableAndResolveHadoopConfiguration(options, conf);
 
     Schema dfSchema = SparkSchemaUtil.convert(table.schema(), dfStruct);
     List<String> errors = CheckCompatibility.writeCompatibilityErrors(table.schema(), dfSchema);
@@ -89,30 +92,49 @@ public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, D
           .toUpperCase(Locale.ENGLISH));
     }
 
-    return Optional.of(new Writer(table, lazyConf(), format));
+    return Optional.of(new Writer(table, conf, format));
   }
 
-  protected Table findTable(DataSourceOptions options) {
+  protected Table findTable(DataSourceOptions options, Configuration conf) {
     Optional<String> location = options.get("path");
     Preconditions.checkArgument(location.isPresent(),
         "Cannot open table without a location: path is not set");
 
-    HadoopTables tables = new HadoopTables(lazyConf());
+    HadoopTables tables = new HadoopTables(conf);
 
     return tables.load(location.get());
   }
 
-  protected SparkSession lazySparkSession() {
+  private SparkSession lazySparkSession() {
     if (lazySpark == null) {
       this.lazySpark = SparkSession.builder().getOrCreate();
     }
     return lazySpark;
   }
 
-  protected Configuration lazyConf() {
+  private Configuration lazyBaseConf() {
     if (lazyConf == null) {
       this.lazyConf = lazySparkSession().sparkContext().hadoopConfiguration();
     }
     return lazyConf;
   }
+
+  private Table getTableAndResolveHadoopConfiguration(
+      DataSourceOptions options, Configuration conf) {
+    // Overwrite configurations from the Spark Context with configurations from the options.
+    mergeIcebergHadoopConfs(conf, options.asMap());
+    Table table = findTable(options, conf);
+    // Set confs from table properties
+    mergeIcebergHadoopConfs(conf, table.properties());
+    // Re-overwrite values set in options and table properties but were not in the environment.
+    mergeIcebergHadoopConfs(conf, options.asMap());
+    return table;
+  }
+
+  private static void mergeIcebergHadoopConfs(
+      Configuration baseConf, Map<String, String> options) {
+    options.keySet().stream()
+        .filter(key -> key.startsWith("iceberg.hadoop"))
+        .forEach(key -> baseConf.set(key.replaceFirst("iceberg.hadoop", ""), options.get(key)));
+  }
 }
diff --git a/spark/src/test/java/com/netflix/iceberg/spark/source/TestIcebergSource.java b/spark/src/test/java/com/netflix/iceberg/spark/source/TestIcebergSource.java
index a544162..357671b 100644
--- a/spark/src/test/java/com/netflix/iceberg/spark/source/TestIcebergSource.java
+++ b/spark/src/test/java/com/netflix/iceberg/spark/source/TestIcebergSource.java
@@ -20,6 +20,7 @@
 package com.netflix.iceberg.spark.source;
 
 import com.netflix.iceberg.Table;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.spark.sql.sources.v2.DataSourceOptions;
 
 public class TestIcebergSource extends IcebergSource {
@@ -29,7 +30,7 @@ public class TestIcebergSource extends IcebergSource {
   }
 
   @Override
-  protected Table findTable(DataSourceOptions options) {
+  protected Table findTable(DataSourceOptions options, Configuration conf) {
     return TestTables.load(options.get("iceberg.table.name").get());
   }
 }