You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/06/18 22:14:41 UTC

[incubator-iceberg] branch master updated: Make Spark options use a consistent style (#224)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 9881f21  Make Spark options use a consistent style (#224)
9881f21 is described below

commit 9881f21a28a3bb189c8df94791389ccbc35390ab
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Tue Jun 18 23:14:37 2019 +0100

    Make Spark options use a consistent style (#224)
---
 .../apache/iceberg/spark/source/IcebergSource.java |   6 +-
 .../spark/source/TestDataSourceOptions.java        | 167 +++++++++++++++++++++
 2 files changed, 170 insertions(+), 3 deletions(-)

diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java b/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
index b3a5fc3..7f41a75 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
@@ -83,7 +83,7 @@ public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, D
       throw new IllegalArgumentException(sb.toString());
     }
 
-    Optional<String> formatOption = options.get("iceberg.write.format");
+    Optional<String> formatOption = options.get("write-format");
     FileFormat format;
     if (formatOption.isPresent()) {
       format = FileFormat.valueOf(formatOption.get().toUpperCase(Locale.ENGLISH));
@@ -135,7 +135,7 @@ public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, D
   private static void mergeIcebergHadoopConfs(
       Configuration baseConf, Map<String, String> options) {
     options.keySet().stream()
-        .filter(key -> key.startsWith("iceberg.hadoop"))
-        .forEach(key -> baseConf.set(key.replaceFirst("iceberg.hadoop", ""), options.get(key)));
+        .filter(key -> key.startsWith("hadoop."))
+        .forEach(key -> baseConf.set(key.replaceFirst("hadoop.", ""), options.get(key)));
   }
 }
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
new file mode 100644
index 0000000..ee21902
--- /dev/null
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.ConfigProperties;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+public class TestDataSourceOptions {
+
+  private static final Configuration CONF = new Configuration();
+  private static final Schema SCHEMA = new Schema(
+      optional(1, "id", Types.IntegerType.get()),
+      optional(2, "data", Types.StringType.get())
+  );
+  private static SparkSession spark = null;
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  @BeforeClass
+  public static void startSpark() {
+    TestDataSourceOptions.spark = SparkSession.builder().master("local[2]").getOrCreate();
+  }
+
+  @AfterClass
+  public static void stopSpark() {
+    SparkSession currentSpark = TestDataSourceOptions.spark;
+    TestDataSourceOptions.spark = null;
+    currentSpark.stop();
+  }
+
+  @Test
+  public void testWriteFormatOptionOverridesTableProperties() throws IOException {
+    String tableLocation = temp.newFolder("iceberg-table").toString();
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.unpartitioned();
+    Map<String, String> options = Maps.newHashMap();
+    options.put(TableProperties.DEFAULT_FILE_FORMAT, "avro");
+    Table table = tables.create(SCHEMA, spec, options, tableLocation);
+
+    List<SimpleRecord> expectedRecords = Lists.newArrayList(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(2, "b"),
+        new SimpleRecord(3, "c")
+    );
+    Dataset<Row> df = spark.createDataFrame(expectedRecords, SimpleRecord.class);
+    df.select("id", "data").write()
+        .format("iceberg")
+        .option("write-format", "parquet")
+        .mode("append")
+        .save(tableLocation);
+
+    try (CloseableIterable<FileScanTask> tasks = table.newScan().planFiles()) {
+      tasks.forEach(task -> {
+        FileFormat fileFormat = FileFormat.fromFileName(task.file().path());
+        Assert.assertEquals(FileFormat.PARQUET, fileFormat);
+      });
+    }
+  }
+
+  @Test
+  public void testNoWriteFormatOption() throws IOException {
+    String tableLocation = temp.newFolder("iceberg-table").toString();
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.unpartitioned();
+    Map<String, String> options = Maps.newHashMap();
+    options.put(TableProperties.DEFAULT_FILE_FORMAT, "avro");
+    Table table = tables.create(SCHEMA, spec, options, tableLocation);
+
+    List<SimpleRecord> expectedRecords = Lists.newArrayList(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(2, "b"),
+        new SimpleRecord(3, "c")
+    );
+    Dataset<Row> df = spark.createDataFrame(expectedRecords, SimpleRecord.class);
+    df.select("id", "data").write()
+        .format("iceberg")
+        .mode("append")
+        .save(tableLocation);
+
+    try (CloseableIterable<FileScanTask> tasks = table.newScan().planFiles()) {
+      tasks.forEach(task -> {
+        FileFormat fileFormat = FileFormat.fromFileName(task.file().path());
+        Assert.assertEquals(FileFormat.AVRO, fileFormat);
+      });
+    }
+  }
+
+  @Test
+  public void testHadoopOptions() throws IOException {
+    String tableLocation = temp.newFolder("iceberg-table").toString();
+
+    Configuration customHadoopConf = new Configuration(CONF);
+    customHadoopConf.set(ConfigProperties.COMPRESS_METADATA, "true");
+
+    HadoopTables tables = new HadoopTables(customHadoopConf);
+    PartitionSpec spec = PartitionSpec.unpartitioned();
+    Map<String, String> options = Maps.newHashMap();
+    tables.create(SCHEMA, spec, options, tableLocation);
+
+    List<SimpleRecord> expectedRecords = Lists.newArrayList(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(2, "b")
+    );
+    Dataset<Row> originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class);
+    originalDf.select("id", "data").write()
+        .format("iceberg")
+        .mode("append")
+        .option("hadoop.iceberg.compress.metadata", "true")
+        .save(tableLocation);
+
+    Dataset<Row> resultDf = spark.read()
+        .format("iceberg")
+        .option("hadoop.iceberg.compress.metadata", "true")
+        .load(tableLocation);
+    List<SimpleRecord> resultRecords = resultDf.orderBy("id")
+        .as(Encoders.bean(SimpleRecord.class))
+        .collectAsList();
+
+    Assert.assertEquals("Records should match", expectedRecords, resultRecords);
+  }
+}