You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/06/18 22:14:41 UTC
[incubator-iceberg] branch master updated: Make Spark options use a
consistent style (#224)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 9881f21 Make Spark options use a consistent style (#224)
9881f21 is described below
commit 9881f21a28a3bb189c8df94791389ccbc35390ab
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Tue Jun 18 23:14:37 2019 +0100
Make Spark options use a consistent style (#224)
---
.../apache/iceberg/spark/source/IcebergSource.java | 6 +-
.../spark/source/TestDataSourceOptions.java | 167 +++++++++++++++++++++
2 files changed, 170 insertions(+), 3 deletions(-)
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java b/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
index b3a5fc3..7f41a75 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
@@ -83,7 +83,7 @@ public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, D
throw new IllegalArgumentException(sb.toString());
}
- Optional<String> formatOption = options.get("iceberg.write.format");
+ Optional<String> formatOption = options.get("write-format");
FileFormat format;
if (formatOption.isPresent()) {
format = FileFormat.valueOf(formatOption.get().toUpperCase(Locale.ENGLISH));
@@ -135,7 +135,7 @@ public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, D
private static void mergeIcebergHadoopConfs(
Configuration baseConf, Map<String, String> options) {
options.keySet().stream()
- .filter(key -> key.startsWith("iceberg.hadoop"))
- .forEach(key -> baseConf.set(key.replaceFirst("iceberg.hadoop", ""), options.get(key)));
+ .filter(key -> key.startsWith("hadoop."))
+ .forEach(key -> baseConf.set(key.replaceFirst("hadoop.", ""), options.get(key)));
}
}
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
new file mode 100644
index 0000000..ee21902
--- /dev/null
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.ConfigProperties;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+public class TestDataSourceOptions {
+
+ private static final Configuration CONF = new Configuration();
+ private static final Schema SCHEMA = new Schema(
+ optional(1, "id", Types.IntegerType.get()),
+ optional(2, "data", Types.StringType.get())
+ );
+ private static SparkSession spark = null;
+
+ @Rule
+ public TemporaryFolder temp = new TemporaryFolder();
+
+ @BeforeClass
+ public static void startSpark() {
+ TestDataSourceOptions.spark = SparkSession.builder().master("local[2]").getOrCreate();
+ }
+
+ @AfterClass
+ public static void stopSpark() {
+ SparkSession currentSpark = TestDataSourceOptions.spark;
+ TestDataSourceOptions.spark = null;
+ currentSpark.stop();
+ }
+
+ @Test
+ public void testWriteFormatOptionOverridesTableProperties() throws IOException {
+ String tableLocation = temp.newFolder("iceberg-table").toString();
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.unpartitioned();
+ Map<String, String> options = Maps.newHashMap();
+ options.put(TableProperties.DEFAULT_FILE_FORMAT, "avro");
+ Table table = tables.create(SCHEMA, spec, options, tableLocation);
+
+ List<SimpleRecord> expectedRecords = Lists.newArrayList(
+ new SimpleRecord(1, "a"),
+ new SimpleRecord(2, "b"),
+ new SimpleRecord(3, "c")
+ );
+ Dataset<Row> df = spark.createDataFrame(expectedRecords, SimpleRecord.class);
+ df.select("id", "data").write()
+ .format("iceberg")
+ .option("write-format", "parquet")
+ .mode("append")
+ .save(tableLocation);
+
+ try (CloseableIterable<FileScanTask> tasks = table.newScan().planFiles()) {
+ tasks.forEach(task -> {
+ FileFormat fileFormat = FileFormat.fromFileName(task.file().path());
+ Assert.assertEquals(FileFormat.PARQUET, fileFormat);
+ });
+ }
+ }
+
+ @Test
+ public void testNoWriteFormatOption() throws IOException {
+ String tableLocation = temp.newFolder("iceberg-table").toString();
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.unpartitioned();
+ Map<String, String> options = Maps.newHashMap();
+ options.put(TableProperties.DEFAULT_FILE_FORMAT, "avro");
+ Table table = tables.create(SCHEMA, spec, options, tableLocation);
+
+ List<SimpleRecord> expectedRecords = Lists.newArrayList(
+ new SimpleRecord(1, "a"),
+ new SimpleRecord(2, "b"),
+ new SimpleRecord(3, "c")
+ );
+ Dataset<Row> df = spark.createDataFrame(expectedRecords, SimpleRecord.class);
+ df.select("id", "data").write()
+ .format("iceberg")
+ .mode("append")
+ .save(tableLocation);
+
+ try (CloseableIterable<FileScanTask> tasks = table.newScan().planFiles()) {
+ tasks.forEach(task -> {
+ FileFormat fileFormat = FileFormat.fromFileName(task.file().path());
+ Assert.assertEquals(FileFormat.AVRO, fileFormat);
+ });
+ }
+ }
+
+ @Test
+ public void testHadoopOptions() throws IOException {
+ String tableLocation = temp.newFolder("iceberg-table").toString();
+
+ Configuration customHadoopConf = new Configuration(CONF);
+ customHadoopConf.set(ConfigProperties.COMPRESS_METADATA, "true");
+
+ HadoopTables tables = new HadoopTables(customHadoopConf);
+ PartitionSpec spec = PartitionSpec.unpartitioned();
+ Map<String, String> options = Maps.newHashMap();
+ tables.create(SCHEMA, spec, options, tableLocation);
+
+ List<SimpleRecord> expectedRecords = Lists.newArrayList(
+ new SimpleRecord(1, "a"),
+ new SimpleRecord(2, "b")
+ );
+ Dataset<Row> originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class);
+ originalDf.select("id", "data").write()
+ .format("iceberg")
+ .mode("append")
+ .option("hadoop.iceberg.compress.metadata", "true")
+ .save(tableLocation);
+
+ Dataset<Row> resultDf = spark.read()
+ .format("iceberg")
+ .option("hadoop.iceberg.compress.metadata", "true")
+ .load(tableLocation);
+ List<SimpleRecord> resultRecords = resultDf.orderBy("id")
+ .as(Encoders.bean(SimpleRecord.class))
+ .collectAsList();
+
+ Assert.assertEquals("Records should match", expectedRecords, resultRecords);
+ }
+}