You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/07/12 18:02:35 UTC

[incubator-iceberg] branch master updated: Control metadata compression via table properties (#258)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new bdd7fa4  Control metadata compression via table properties (#258)
bdd7fa4 is described below

commit bdd7fa470434d484ba5c8eb27a73055976be2804
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Fri Jul 12 19:02:30 2019 +0100

    Control metadata compression via table properties (#258)
---
 .../iceberg/BaseMetastoreTableOperations.java      |  6 +-
 .../java/org/apache/iceberg/ConfigProperties.java  | 35 -----------
 .../java/org/apache/iceberg/TableMetadata.java     |  4 ++
 .../org/apache/iceberg/TableMetadataParser.java    | 51 +++++++++++++---
 .../java/org/apache/iceberg/TableProperties.java   |  3 +
 .../iceberg/hadoop/HadoopTableOperations.java      | 70 +++++++++++++---------
 .../iceberg/TableMetadataParserCodecTest.java      | 59 ++++++++++++++++++
 .../apache/iceberg/TableMetadataParserTest.java    | 70 +++++++++++++---------
 .../apache/iceberg/hadoop/HadoopTableTestBase.java |  2 +-
 .../apache/iceberg/hadoop/TestHadoopCommits.java   |  4 +-
 .../org/apache/iceberg/hive/HiveTableBaseTest.java |  3 +-
 site/docs/configuration.md                         |  1 +
 .../source/IcebergSourceFlatDataBenchmark.java     | 11 ++--
 .../source/IcebergSourceNestedDataBenchmark.java   | 11 ++--
 .../spark/source/TestDataSourceOptions.java        | 66 ++++++++++----------
 15 files changed, 250 insertions(+), 146 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java
index f0502b6..f270eaa 100644
--- a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java
+++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java
@@ -142,8 +142,10 @@ public abstract class BaseMetastoreTableOperations implements TableOperations {
   }
 
   private String newTableMetadataFilePath(TableMetadata meta, int newVersion) {
-    return metadataFileLocation(meta,
-        String.format("%05d-%s%s", newVersion, UUID.randomUUID(), TableMetadataParser.getFileExtension(this.conf)));
+    String codecName = meta.property(
+        TableProperties.METADATA_COMPRESSION, TableProperties.METADATA_COMPRESSION_DEFAULT);
+    String fileExtension = TableMetadataParser.getFileExtension(codecName);
+    return metadataFileLocation(meta, String.format("%05d-%s%s", newVersion, UUID.randomUUID(), fileExtension));
   }
 
   private static int parseVersion(String metadataLocation) {
diff --git a/core/src/main/java/org/apache/iceberg/ConfigProperties.java b/core/src/main/java/org/apache/iceberg/ConfigProperties.java
deleted file mode 100644
index 0522b77..0000000
--- a/core/src/main/java/org/apache/iceberg/ConfigProperties.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iceberg;
-
-import org.apache.hadoop.conf.Configuration;
-
-public class ConfigProperties {
-
-  private ConfigProperties() {}
-
-  public static final String COMPRESS_METADATA = "iceberg.compress.metadata";
-  public static final boolean COMPRESS_METADATA_DEFAULT = false;
-
-  public static final boolean shouldCompress(Configuration configuration) {
-    return configuration.getBoolean(COMPRESS_METADATA, COMPRESS_METADATA_DEFAULT);
-  }
-
-}
diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java
index 6120909..342fb5a 100644
--- a/core/src/main/java/org/apache/iceberg/TableMetadata.java
+++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java
@@ -231,6 +231,10 @@ public class TableMetadata {
     return properties;
   }
 
+  public String property(String property, String defaultValue) {
+    return properties.getOrDefault(property, defaultValue);
+  }
+
   public boolean propertyAsBoolean(String property, boolean defaultValue) {
     return PropertyUtil.propertyAsBoolean(properties, property, defaultValue);
   }
diff --git a/core/src/main/java/org/apache/iceberg/TableMetadataParser.java b/core/src/main/java/org/apache/iceberg/TableMetadataParser.java
index 96351e6..2df52a0 100644
--- a/core/src/main/java/org/apache/iceberg/TableMetadataParser.java
+++ b/core/src/main/java/org/apache/iceberg/TableMetadataParser.java
@@ -32,11 +32,11 @@ import java.io.StringWriter;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.SortedSet;
 import java.util.zip.GZIPInputStream;
 import java.util.zip.GZIPOutputStream;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.TableMetadata.SnapshotLogEntry;
 import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.io.InputFile;
@@ -45,6 +45,37 @@ import org.apache.iceberg.util.JsonUtil;
 
 public class TableMetadataParser {
 
+  public enum Codec {
+    NONE(""),
+    GZIP(".gz");
+
+    private final String extension;
+
+    Codec(String extension) {
+      this.extension = extension;
+    }
+
+    public static Codec fromName(String codecName) {
+      Preconditions.checkArgument(codecName != null, "Codec name is null");
+      return Codec.valueOf(codecName.toUpperCase(Locale.ENGLISH));
+    }
+
+    public static Codec fromFileName(String fileName) {
+      Preconditions.checkArgument(fileName.contains(".metadata.json"),
+          "%s is not a valid metadata file", fileName);
+      // we have to be backward-compatible with .metadata.json.gz files
+      if (fileName.endsWith(".metadata.json.gz")) {
+        return Codec.GZIP;
+      }
+      String fileNameWithoutSuffix = fileName.substring(0, fileName.lastIndexOf(".metadata.json"));
+      if (fileNameWithoutSuffix.endsWith(Codec.GZIP.extension)) {
+        return Codec.GZIP;
+      } else {
+        return Codec.NONE;
+      }
+    }
+  }
+
   private TableMetadataParser() {}
 
   // visible for testing
@@ -65,10 +96,9 @@ public class TableMetadataParser {
   static final String SNAPSHOT_LOG = "snapshot-log";
 
   public static void write(TableMetadata metadata, OutputFile outputFile) {
+    Codec codec = Codec.fromFileName(outputFile.location());
     try (OutputStreamWriter writer = new OutputStreamWriter(
-        outputFile.location().endsWith(".gz") ?
-            new GZIPOutputStream(outputFile.create()) :
-            outputFile.create())) {
+        codec == Codec.GZIP ? new GZIPOutputStream(outputFile.create()) : outputFile.create())) {
       JsonGenerator generator = JsonUtil.factory().createGenerator(writer);
       generator.useDefaultPrettyPrinter();
       toJson(metadata, generator);
@@ -78,8 +108,12 @@ public class TableMetadataParser {
     }
   }
 
-  public static String getFileExtension(Configuration configuration) {
-    return ConfigProperties.shouldCompress(configuration) ? ".metadata.json.gz" : ".metadata.json";
+  public static String getFileExtension(String codecName) {
+    return getFileExtension(Codec.fromName(codecName));
+  }
+
+  public static String getFileExtension(Codec codec) {
+    return codec.extension + ".metadata.json";
   }
 
   public static String toJson(TableMetadata metadata) {
@@ -146,9 +180,8 @@ public class TableMetadataParser {
   }
 
   public static TableMetadata read(TableOperations ops, InputFile file) {
-    try {
-      InputStream is =
-          file.location().endsWith("gz") ? new GZIPInputStream(file.newStream()) : file.newStream();
+    Codec codec = Codec.fromFileName(file.location());
+    try (InputStream is = codec == Codec.GZIP ? new GZIPInputStream(file.newStream()) : file.newStream()) {
       return fromJson(ops, file, JsonUtil.mapper().readValue(is, JsonNode.class));
     } catch (IOException e) {
       throw new RuntimeIOException(e, "Failed to read file: %s", file);
diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java
index 9481118..10892b9 100644
--- a/core/src/main/java/org/apache/iceberg/TableProperties.java
+++ b/core/src/main/java/org/apache/iceberg/TableProperties.java
@@ -86,6 +86,9 @@ public class TableProperties {
   public static final String MANIFEST_LISTS_ENABLED = "write.manifest-lists.enabled";
   public static final boolean MANIFEST_LISTS_ENABLED_DEFAULT = true;
 
+  public static final String METADATA_COMPRESSION = "write.metadata.compression-codec";
+  public static final String METADATA_COMPRESSION_DEFAULT = "none";
+
   public static final String WRITE_METADATA_TRUNCATE_BYTES = "write.metadata.truncate-length";
   public static final int WRITE_METADATA_TRUNCATE_BYTES_DEFAULT = 16;
 }
diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java
index 256aa80..0486bdd 100644
--- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java
+++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java
@@ -73,38 +73,37 @@ public class HadoopTableOperations implements TableOperations {
   @Override
   public TableMetadata refresh() {
     int ver = version != null ? version : readVersionHint();
-    Path metadataFile = metadataFile(ver);
-    FileSystem fs = getFileSystem(metadataFile, conf);
     try {
-      // don't check if the file exists if version is non-null because it was already checked
-      if (version == null && !fs.exists(metadataFile)) {
-        if (ver == 0) {
-          // no v0 metadata means the table doesn't exist yet
-          return null;
-        }
-        throw new ValidationException("Metadata file is missing: %s", metadataFile);
+      Path metadataFile = getMetadataFile(ver);
+      if (version == null && metadataFile == null && ver == 0) {
+        // no v0 metadata means the table doesn't exist yet
+        return null;
+      } else if (metadataFile == null) {
+        throw new ValidationException("Metadata file for version %d is missing", ver);
       }
 
-      while (fs.exists(metadataFile(ver + 1))) {
+      Path nextMetadataFile = getMetadataFile(ver + 1);
+      while (nextMetadataFile != null) {
         ver += 1;
-        metadataFile = metadataFile(ver);
+        metadataFile = nextMetadataFile;
+        nextMetadataFile = getMetadataFile(ver + 1);
       }
 
-    } catch (IOException e) {
-      throw new RuntimeIOException(e, "Failed to get file system for path: %s", metadataFile);
-    }
-    this.version = ver;
+      this.version = ver;
 
-    TableMetadata newMetadata = TableMetadataParser.read(this, io().newInputFile(metadataFile.toString()));
-    String newUUID = newMetadata.uuid();
-    if (currentMetadata != null) {
-      Preconditions.checkState(newUUID == null || newUUID.equals(currentMetadata.uuid()),
-          "Table UUID does not match: current=%s != refreshed=%s", currentMetadata.uuid(), newUUID);
-    }
+      TableMetadata newMetadata = TableMetadataParser.read(this, io().newInputFile(metadataFile.toString()));
+      String newUUID = newMetadata.uuid();
+      if (currentMetadata != null) {
+        Preconditions.checkState(newUUID == null || newUUID.equals(currentMetadata.uuid()),
+            "Table UUID does not match: current=%s != refreshed=%s", currentMetadata.uuid(), newUUID);
+      }
 
-    this.currentMetadata = newMetadata;
-    this.shouldRefresh = false;
-    return currentMetadata;
+      this.currentMetadata = newMetadata;
+      this.shouldRefresh = false;
+      return currentMetadata;
+    } catch (IOException e) {
+      throw new RuntimeIOException(e, "Failed to refresh the table");
+    }
   }
 
   @Override
@@ -124,11 +123,15 @@ public class HadoopTableOperations implements TableOperations {
         !metadata.properties().containsKey(TableProperties.WRITE_METADATA_LOCATION),
         "Hadoop path-based tables cannot relocate metadata");
 
-    Path tempMetadataFile = metadataPath(UUID.randomUUID().toString() + TableMetadataParser.getFileExtension(conf));
+    String codecName = metadata.property(
+        TableProperties.METADATA_COMPRESSION, TableProperties.METADATA_COMPRESSION_DEFAULT);
+    TableMetadataParser.Codec codec = TableMetadataParser.Codec.fromName(codecName);
+    String fileExtension = TableMetadataParser.getFileExtension(codec);
+    Path tempMetadataFile = metadataPath(UUID.randomUUID().toString() + fileExtension);
     TableMetadataParser.write(metadata, io().newOutputFile(tempMetadataFile.toString()));
 
     int nextVersion = (version != null ? version : 0) + 1;
-    Path finalMetadataFile = metadataFile(nextVersion);
+    Path finalMetadataFile = metadataFilePath(nextVersion, codec);
     FileSystem fs = getFileSystem(tempMetadataFile, conf);
 
     try {
@@ -168,8 +171,19 @@ public class HadoopTableOperations implements TableOperations {
     return metadataPath(fileName).toString();
   }
 
-  private Path metadataFile(int metadataVersion) {
-    return metadataPath("v" + metadataVersion + TableMetadataParser.getFileExtension(conf));
+  private Path getMetadataFile(int metadataVersion) throws IOException {
+    for (TableMetadataParser.Codec codec : TableMetadataParser.Codec.values()) {
+      Path metadataFile = metadataFilePath(metadataVersion, codec);
+      FileSystem fs = getFileSystem(metadataFile, conf);
+      if (fs.exists(metadataFile)) {
+        return metadataFile;
+      }
+    }
+    return null;
+  }
+
+  private Path metadataFilePath(int metadataVersion, TableMetadataParser.Codec codec) {
+    return metadataPath("v" + metadataVersion + TableMetadataParser.getFileExtension(codec));
   }
 
   private Path metadataPath(String filename) {
diff --git a/core/src/test/java/org/apache/iceberg/TableMetadataParserCodecTest.java b/core/src/test/java/org/apache/iceberg/TableMetadataParserCodecTest.java
new file mode 100644
index 0000000..0ffca02
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/TableMetadataParserCodecTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import org.apache.iceberg.TableMetadataParser.Codec;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+public class TableMetadataParserCodecTest {
+
+  @Rule
+  public ExpectedException exceptionRule = ExpectedException.none();
+
+  @Test
+  public void testCompressionCodec() {
+    Assert.assertEquals(Codec.GZIP, Codec.fromName("gzip"));
+    Assert.assertEquals(Codec.GZIP, Codec.fromName("gZiP"));
+    Assert.assertEquals(Codec.GZIP, Codec.fromFileName("v3.gz.metadata.json"));
+    Assert.assertEquals(Codec.GZIP, Codec.fromFileName("v3-f326-4b66-a541-7b1c.gz.metadata.json"));
+    Assert.assertEquals(Codec.GZIP, Codec.fromFileName("v3-f326-4b66-a541-7b1c.metadata.json.gz"));
+    Assert.assertEquals(Codec.NONE, Codec.fromName("none"));
+    Assert.assertEquals(Codec.NONE, Codec.fromName("nOnE"));
+    Assert.assertEquals(Codec.NONE, Codec.fromFileName("v3.metadata.json"));
+    Assert.assertEquals(Codec.NONE, Codec.fromFileName("v3-f326-4b66-a541-7b1c.metadata.json"));
+  }
+
+  @Test
+  public void testInvalidCodecName() {
+    exceptionRule.expect(IllegalArgumentException.class);
+    exceptionRule.expectMessage("No enum constant");
+    Codec.fromName("invalid");
+  }
+
+  @Test
+  public void testInvalidFileName() {
+    exceptionRule.expect(IllegalArgumentException.class);
+    exceptionRule.expectMessage("path/to/file.parquet is not a valid metadata file");
+    Codec.fromFileName("path/to/file.parquet");
+  }
+}
diff --git a/core/src/test/java/org/apache/iceberg/TableMetadataParserTest.java b/core/src/test/java/org/apache/iceberg/TableMetadataParserTest.java
index 7803877..8171cf9 100644
--- a/core/src/test/java/org/apache/iceberg/TableMetadataParserTest.java
+++ b/core/src/test/java/org/apache/iceberg/TableMetadataParserTest.java
@@ -19,63 +19,77 @@
 
 package org.apache.iceberg;
 
-import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.Map;
 import java.util.zip.GZIPInputStream;
 import java.util.zip.ZipException;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.TableMetadataParser.Codec;
 import org.apache.iceberg.io.OutputFile;
 import org.apache.iceberg.types.Types.BooleanType;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
-import static org.apache.iceberg.ConfigProperties.COMPRESS_METADATA;
 import static org.apache.iceberg.PartitionSpec.unpartitioned;
 import static org.apache.iceberg.TableMetadata.newTableMetadata;
 import static org.apache.iceberg.TableMetadataParser.getFileExtension;
 import static org.apache.iceberg.types.Types.NestedField.optional;
 
+@RunWith(Parameterized.class)
 public class TableMetadataParserTest {
 
-  private static final Schema SCHEMA = new Schema(Lists.newArrayList(optional(1, "b", BooleanType.get())));
-  private static final TableMetadata EXPECTED =
-      newTableMetadata(null, SCHEMA, unpartitioned(), "file://tmp/db/table");
+  private static final Schema SCHEMA = new Schema(optional(1, "b", BooleanType.get()));
+
+  @Parameterized.Parameters
+  public static Object[][] parameters() {
+    return new Object[][] {
+        new Object[] { "none" },
+        new Object[] { "gzip" }
+    };
+  }
+
+  private final String codecName;
+
+  public TableMetadataParserTest(String codecName) {
+    this.codecName = codecName;
+  }
 
   @Test
   public void testCompressionProperty() throws IOException {
-    final boolean[] props = {true, false};
-    final Configuration configuration = new Configuration();
-    for (boolean prop : props) {
-      configuration.setBoolean(COMPRESS_METADATA, prop);
-      final OutputFile outputFile = Files.localOutput(getFileExtension(configuration));
-      TableMetadataParser.write(EXPECTED, outputFile);
-      Assert.assertEquals(prop, isCompressed(getFileExtension(configuration)));
-      final TableMetadata read = TableMetadataParser.read(
-          null, Files.localInput(new File(getFileExtension(configuration))));
-      verifyMetadata(read);
-    }
+    Codec codec = Codec.fromName(codecName);
+    String fileExtension = getFileExtension(codec);
+    String fileName = "v3" + fileExtension;
+    OutputFile outputFile = Files.localOutput(fileName);
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put(TableProperties.METADATA_COMPRESSION, codecName);
+    String location = "file://tmp/db/table";
+    TableMetadata metadata = newTableMetadata(null, SCHEMA, unpartitioned(), location, properties);
+    TableMetadataParser.write(metadata, outputFile);
+    Assert.assertEquals(codec == Codec.GZIP, isCompressed(fileName));
+    TableMetadata actualMetadata = TableMetadataParser.read(null, Files.localInput(new File(fileName)));
+    verifyMetadata(metadata, actualMetadata);
   }
 
   @After
   public void cleanup() throws IOException {
-    final boolean[] props = {true, false};
-    Configuration configuration = new Configuration();
-    for (boolean prop : props) {
-      configuration.setBoolean(COMPRESS_METADATA, prop);
-      java.nio.file.Files.deleteIfExists(Paths.get(getFileExtension(configuration)));
-    }
+    Codec codec = Codec.fromName(codecName);
+    Path metadataFilePath = Paths.get("v3" + getFileExtension(codec));
+    java.nio.file.Files.deleteIfExists(metadataFilePath);
   }
 
-  private void verifyMetadata(TableMetadata read) {
-    Assert.assertEquals(EXPECTED.schema().asStruct(), read.schema().asStruct());
-    Assert.assertEquals(EXPECTED.location(), read.location());
-    Assert.assertEquals(EXPECTED.lastColumnId(), read.lastColumnId());
-    Assert.assertEquals(EXPECTED.properties(), read.properties());
+  private void verifyMetadata(TableMetadata expected, TableMetadata actual) {
+    Assert.assertEquals(expected.schema().asStruct(), actual.schema().asStruct());
+    Assert.assertEquals(expected.location(), actual.location());
+    Assert.assertEquals(expected.lastColumnId(), actual.lastColumnId());
+    Assert.assertEquals(expected.properties(), actual.properties());
   }
 
   private boolean isCompressed(String path) throws IOException {
diff --git a/core/src/test/java/org/apache/iceberg/hadoop/HadoopTableTestBase.java b/core/src/test/java/org/apache/iceberg/hadoop/HadoopTableTestBase.java
index 2a8c2f5..f771df6 100644
--- a/core/src/test/java/org/apache/iceberg/hadoop/HadoopTableTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/hadoop/HadoopTableTestBase.java
@@ -125,7 +125,7 @@ public class HadoopTableTestBase {
   }
 
   File version(int versionNumber) {
-    return new File(metadataDir, "v" + versionNumber + getFileExtension(new Configuration()));
+    return new File(metadataDir, "v" + versionNumber + getFileExtension(TableMetadataParser.Codec.NONE));
   }
 
   TableMetadata readMetadataVersion(int version) {
diff --git a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCommits.java b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCommits.java
index 99da3d5..1c56123 100644
--- a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCommits.java
+++ b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCommits.java
@@ -312,7 +312,7 @@ public class TestHadoopCommits extends HadoopTableTestBase {
   @Test
   public void testRenameReturnFalse() throws Exception {
     FileSystem mockFs = mock(FileSystem.class);
-    when(mockFs.exists(any())).thenReturn(false);
+    when(mockFs.exists(any())).thenReturn(true, false);
     when(mockFs.rename(any(), any())).thenReturn(false);
     testRenameWithFileSystem(mockFs);
   }
@@ -320,7 +320,7 @@ public class TestHadoopCommits extends HadoopTableTestBase {
   @Test
   public void testRenameThrow() throws Exception {
     FileSystem mockFs = mock(FileSystem.class);
-    when(mockFs.exists(any())).thenReturn(false);
+    when(mockFs.exists(any())).thenReturn(true, false);
     when(mockFs.rename(any(), any())).thenThrow(new IOException("test injected"));
     testRenameWithFileSystem(mockFs);
   }
diff --git a/hive/src/test/java/org/apache/iceberg/hive/HiveTableBaseTest.java b/hive/src/test/java/org/apache/iceberg/hive/HiveTableBaseTest.java
index 305ca6b..13073ff 100644
--- a/hive/src/test/java/org/apache/iceberg/hive/HiveTableBaseTest.java
+++ b/hive/src/test/java/org/apache/iceberg/hive/HiveTableBaseTest.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.TableMetadataParser;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.types.Types;
 import org.junit.After;
@@ -122,7 +123,7 @@ public class HiveTableBaseTest {
   }
 
   protected static List<String> metadataVersionFiles(String tableName) {
-    return filterByExtension(tableName, getFileExtension(hiveConf));
+    return filterByExtension(tableName, getFileExtension(TableMetadataParser.Codec.NONE));
   }
 
   protected static List<String> manifestFiles(String tableName) {
diff --git a/site/docs/configuration.md b/site/docs/configuration.md
index 5c2309b..ccce102 100644
--- a/site/docs/configuration.md
+++ b/site/docs/configuration.md
@@ -22,6 +22,7 @@ Iceberg tables support table properties to configure table behavior, like the de
 | write.parquet.dict-size-bytes      | 2097152 (2 MB)     | Parquet dictionary page size                       |
 | write.parquet.compression-codec    | gzip               | Parquet compression codec                          |
 | write.avro.compression-codec       | gzip               | Avro compression codec                             |
+| write.metadata.compression-codec   | none               | Metadata compression codec; none or gzip           |
 
 
 ### Table behavior properties
diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceFlatDataBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceFlatDataBenchmark.java
index 72346a7..9f9a070 100644
--- a/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceFlatDataBenchmark.java
+++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceFlatDataBenchmark.java
@@ -20,11 +20,12 @@
 package org.apache.iceberg.spark.source;
 
 import com.google.common.collect.Maps;
+import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.ConfigProperties;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.types.Types;
 
@@ -35,9 +36,7 @@ public abstract class IcebergSourceFlatDataBenchmark extends IcebergSourceBenchm
 
   @Override
   protected Configuration initHadoopConf() {
-    Configuration conf = new Configuration();
-    conf.set(ConfigProperties.COMPRESS_METADATA, "true");
-    return conf;
+    return new Configuration();
   }
 
   @Override
@@ -53,6 +52,8 @@ public abstract class IcebergSourceFlatDataBenchmark extends IcebergSourceBenchm
         optional(8, "stringCol", Types.StringType.get()));
     PartitionSpec partitionSpec = PartitionSpec.unpartitioned();
     HadoopTables tables = new HadoopTables(hadoopConf());
-    return tables.create(schema, partitionSpec, Maps.newHashMap(), newTableLocation());
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put(TableProperties.METADATA_COMPRESSION, "gzip");
+    return tables.create(schema, partitionSpec, properties, newTableLocation());
   }
 }
diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceNestedDataBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceNestedDataBenchmark.java
index bc54257..16468a1 100644
--- a/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceNestedDataBenchmark.java
+++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceNestedDataBenchmark.java
@@ -20,11 +20,12 @@
 package org.apache.iceberg.spark.source;
 
 import com.google.common.collect.Maps;
+import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.ConfigProperties;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.types.Types;
 
@@ -35,9 +36,7 @@ public abstract class IcebergSourceNestedDataBenchmark extends IcebergSourceBenc
 
   @Override
   protected Configuration initHadoopConf() {
-    Configuration conf = new Configuration();
-    conf.set(ConfigProperties.COMPRESS_METADATA, "true");
-    return conf;
+    return new Configuration();
   }
 
   @Override
@@ -52,6 +51,8 @@ public abstract class IcebergSourceNestedDataBenchmark extends IcebergSourceBenc
     );
     PartitionSpec partitionSpec = PartitionSpec.unpartitioned();
     HadoopTables tables = new HadoopTables(hadoopConf());
-    return tables.create(schema, partitionSpec, Maps.newHashMap(), newTableLocation());
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put(TableProperties.METADATA_COMPRESSION, "gzip");
+    return tables.create(schema, partitionSpec, properties, newTableLocation());
   }
 }
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
index ee21902..cbb5351 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
@@ -25,7 +25,6 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.ConfigProperties;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.PartitionSpec;
@@ -134,34 +133,41 @@ public class TestDataSourceOptions {
   @Test
   public void testHadoopOptions() throws IOException {
     String tableLocation = temp.newFolder("iceberg-table").toString();
-
-    Configuration customHadoopConf = new Configuration(CONF);
-    customHadoopConf.set(ConfigProperties.COMPRESS_METADATA, "true");
-
-    HadoopTables tables = new HadoopTables(customHadoopConf);
-    PartitionSpec spec = PartitionSpec.unpartitioned();
-    Map<String, String> options = Maps.newHashMap();
-    tables.create(SCHEMA, spec, options, tableLocation);
-
-    List<SimpleRecord> expectedRecords = Lists.newArrayList(
-        new SimpleRecord(1, "a"),
-        new SimpleRecord(2, "b")
-    );
-    Dataset<Row> originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class);
-    originalDf.select("id", "data").write()
-        .format("iceberg")
-        .mode("append")
-        .option("hadoop.iceberg.compress.metadata", "true")
-        .save(tableLocation);
-
-    Dataset<Row> resultDf = spark.read()
-        .format("iceberg")
-        .option("hadoop.iceberg.compress.metadata", "true")
-        .load(tableLocation);
-    List<SimpleRecord> resultRecords = resultDf.orderBy("id")
-        .as(Encoders.bean(SimpleRecord.class))
-        .collectAsList();
-
-    Assert.assertEquals("Records should match", expectedRecords, resultRecords);
+    Configuration sparkHadoopConf = spark.sparkContext().hadoopConfiguration();
+    String originalDefaultFS = sparkHadoopConf.get("fs.default.name");
+
+    try {
+      HadoopTables tables = new HadoopTables(CONF);
+      PartitionSpec spec = PartitionSpec.unpartitioned();
+      Map<String, String> options = Maps.newHashMap();
+      tables.create(SCHEMA, spec, options, tableLocation);
+
+      // set an invalid value for 'fs.default.name' in Spark Hadoop config
+      // to verify that 'hadoop.' data source options are propagated correctly
+      sparkHadoopConf.set("fs.default.name", "hdfs://localhost:9000");
+
+      List<SimpleRecord> expectedRecords = Lists.newArrayList(
+          new SimpleRecord(1, "a"),
+          new SimpleRecord(2, "b")
+      );
+      Dataset<Row> originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class);
+      originalDf.select("id", "data").write()
+          .format("iceberg")
+          .mode("append")
+          .option("hadoop.fs.default.name", "file:///")
+          .save(tableLocation);
+
+      Dataset<Row> resultDf = spark.read()
+          .format("iceberg")
+          .option("hadoop.fs.default.name", "file:///")
+          .load(tableLocation);
+      List<SimpleRecord> resultRecords = resultDf.orderBy("id")
+          .as(Encoders.bean(SimpleRecord.class))
+          .collectAsList();
+
+      Assert.assertEquals("Records should match", expectedRecords, resultRecords);
+    } finally {
+      sparkHadoopConf.set("fs.default.name", originalDefaultFS);
+    }
   }
 }