You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/07/12 18:02:35 UTC
[incubator-iceberg] branch master updated: Control metadata
compression via table properties (#258)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new bdd7fa4 Control metadata compression via table properties (#258)
bdd7fa4 is described below
commit bdd7fa470434d484ba5c8eb27a73055976be2804
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Fri Jul 12 19:02:30 2019 +0100
Control metadata compression via table properties (#258)
---
.../iceberg/BaseMetastoreTableOperations.java | 6 +-
.../java/org/apache/iceberg/ConfigProperties.java | 35 -----------
.../java/org/apache/iceberg/TableMetadata.java | 4 ++
.../org/apache/iceberg/TableMetadataParser.java | 51 +++++++++++++---
.../java/org/apache/iceberg/TableProperties.java | 3 +
.../iceberg/hadoop/HadoopTableOperations.java | 70 +++++++++++++---------
.../iceberg/TableMetadataParserCodecTest.java | 59 ++++++++++++++++++
.../apache/iceberg/TableMetadataParserTest.java | 70 +++++++++++++---------
.../apache/iceberg/hadoop/HadoopTableTestBase.java | 2 +-
.../apache/iceberg/hadoop/TestHadoopCommits.java | 4 +-
.../org/apache/iceberg/hive/HiveTableBaseTest.java | 3 +-
site/docs/configuration.md | 1 +
.../source/IcebergSourceFlatDataBenchmark.java | 11 ++--
.../source/IcebergSourceNestedDataBenchmark.java | 11 ++--
.../spark/source/TestDataSourceOptions.java | 66 ++++++++++----------
15 files changed, 250 insertions(+), 146 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java
index f0502b6..f270eaa 100644
--- a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java
+++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java
@@ -142,8 +142,10 @@ public abstract class BaseMetastoreTableOperations implements TableOperations {
}
private String newTableMetadataFilePath(TableMetadata meta, int newVersion) {
- return metadataFileLocation(meta,
- String.format("%05d-%s%s", newVersion, UUID.randomUUID(), TableMetadataParser.getFileExtension(this.conf)));
+ String codecName = meta.property(
+ TableProperties.METADATA_COMPRESSION, TableProperties.METADATA_COMPRESSION_DEFAULT);
+ String fileExtension = TableMetadataParser.getFileExtension(codecName);
+ return metadataFileLocation(meta, String.format("%05d-%s%s", newVersion, UUID.randomUUID(), fileExtension));
}
private static int parseVersion(String metadataLocation) {
diff --git a/core/src/main/java/org/apache/iceberg/ConfigProperties.java b/core/src/main/java/org/apache/iceberg/ConfigProperties.java
deleted file mode 100644
index 0522b77..0000000
--- a/core/src/main/java/org/apache/iceberg/ConfigProperties.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iceberg;
-
-import org.apache.hadoop.conf.Configuration;
-
-public class ConfigProperties {
-
- private ConfigProperties() {}
-
- public static final String COMPRESS_METADATA = "iceberg.compress.metadata";
- public static final boolean COMPRESS_METADATA_DEFAULT = false;
-
- public static final boolean shouldCompress(Configuration configuration) {
- return configuration.getBoolean(COMPRESS_METADATA, COMPRESS_METADATA_DEFAULT);
- }
-
-}
diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java
index 6120909..342fb5a 100644
--- a/core/src/main/java/org/apache/iceberg/TableMetadata.java
+++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java
@@ -231,6 +231,10 @@ public class TableMetadata {
return properties;
}
+ public String property(String property, String defaultValue) {
+ return properties.getOrDefault(property, defaultValue);
+ }
+
public boolean propertyAsBoolean(String property, boolean defaultValue) {
return PropertyUtil.propertyAsBoolean(properties, property, defaultValue);
}
diff --git a/core/src/main/java/org/apache/iceberg/TableMetadataParser.java b/core/src/main/java/org/apache/iceberg/TableMetadataParser.java
index 96351e6..2df52a0 100644
--- a/core/src/main/java/org/apache/iceberg/TableMetadataParser.java
+++ b/core/src/main/java/org/apache/iceberg/TableMetadataParser.java
@@ -32,11 +32,11 @@ import java.io.StringWriter;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.SortedSet;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
-import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.TableMetadata.SnapshotLogEntry;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.InputFile;
@@ -45,6 +45,37 @@ import org.apache.iceberg.util.JsonUtil;
public class TableMetadataParser {
+ public enum Codec {
+ NONE(""),
+ GZIP(".gz");
+
+ private final String extension;
+
+ Codec(String extension) {
+ this.extension = extension;
+ }
+
+ public static Codec fromName(String codecName) {
+ Preconditions.checkArgument(codecName != null, "Codec name is null");
+ return Codec.valueOf(codecName.toUpperCase(Locale.ENGLISH));
+ }
+
+ public static Codec fromFileName(String fileName) {
+ Preconditions.checkArgument(fileName.contains(".metadata.json"),
+ "%s is not a valid metadata file", fileName);
+ // we have to be backward-compatible with .metadata.json.gz files
+ if (fileName.endsWith(".metadata.json.gz")) {
+ return Codec.GZIP;
+ }
+ String fileNameWithoutSuffix = fileName.substring(0, fileName.lastIndexOf(".metadata.json"));
+ if (fileNameWithoutSuffix.endsWith(Codec.GZIP.extension)) {
+ return Codec.GZIP;
+ } else {
+ return Codec.NONE;
+ }
+ }
+ }
+
private TableMetadataParser() {}
// visible for testing
@@ -65,10 +96,9 @@ public class TableMetadataParser {
static final String SNAPSHOT_LOG = "snapshot-log";
public static void write(TableMetadata metadata, OutputFile outputFile) {
+ Codec codec = Codec.fromFileName(outputFile.location());
try (OutputStreamWriter writer = new OutputStreamWriter(
- outputFile.location().endsWith(".gz") ?
- new GZIPOutputStream(outputFile.create()) :
- outputFile.create())) {
+ codec == Codec.GZIP ? new GZIPOutputStream(outputFile.create()) : outputFile.create())) {
JsonGenerator generator = JsonUtil.factory().createGenerator(writer);
generator.useDefaultPrettyPrinter();
toJson(metadata, generator);
@@ -78,8 +108,12 @@ public class TableMetadataParser {
}
}
- public static String getFileExtension(Configuration configuration) {
- return ConfigProperties.shouldCompress(configuration) ? ".metadata.json.gz" : ".metadata.json";
+ public static String getFileExtension(String codecName) {
+ return getFileExtension(Codec.fromName(codecName));
+ }
+
+ public static String getFileExtension(Codec codec) {
+ return codec.extension + ".metadata.json";
}
public static String toJson(TableMetadata metadata) {
@@ -146,9 +180,8 @@ public class TableMetadataParser {
}
public static TableMetadata read(TableOperations ops, InputFile file) {
- try {
- InputStream is =
- file.location().endsWith("gz") ? new GZIPInputStream(file.newStream()) : file.newStream();
+ Codec codec = Codec.fromFileName(file.location());
+ try (InputStream is = codec == Codec.GZIP ? new GZIPInputStream(file.newStream()) : file.newStream()) {
return fromJson(ops, file, JsonUtil.mapper().readValue(is, JsonNode.class));
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to read file: %s", file);
diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java
index 9481118..10892b9 100644
--- a/core/src/main/java/org/apache/iceberg/TableProperties.java
+++ b/core/src/main/java/org/apache/iceberg/TableProperties.java
@@ -86,6 +86,9 @@ public class TableProperties {
public static final String MANIFEST_LISTS_ENABLED = "write.manifest-lists.enabled";
public static final boolean MANIFEST_LISTS_ENABLED_DEFAULT = true;
+ public static final String METADATA_COMPRESSION = "write.metadata.compression-codec";
+ public static final String METADATA_COMPRESSION_DEFAULT = "none";
+
public static final String WRITE_METADATA_TRUNCATE_BYTES = "write.metadata.truncate-length";
public static final int WRITE_METADATA_TRUNCATE_BYTES_DEFAULT = 16;
}
diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java
index 256aa80..0486bdd 100644
--- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java
+++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java
@@ -73,38 +73,37 @@ public class HadoopTableOperations implements TableOperations {
@Override
public TableMetadata refresh() {
int ver = version != null ? version : readVersionHint();
- Path metadataFile = metadataFile(ver);
- FileSystem fs = getFileSystem(metadataFile, conf);
try {
- // don't check if the file exists if version is non-null because it was already checked
- if (version == null && !fs.exists(metadataFile)) {
- if (ver == 0) {
- // no v0 metadata means the table doesn't exist yet
- return null;
- }
- throw new ValidationException("Metadata file is missing: %s", metadataFile);
+ Path metadataFile = getMetadataFile(ver);
+ if (version == null && metadataFile == null && ver == 0) {
+ // no v0 metadata means the table doesn't exist yet
+ return null;
+ } else if (metadataFile == null) {
+ throw new ValidationException("Metadata file for version %d is missing", ver);
}
- while (fs.exists(metadataFile(ver + 1))) {
+ Path nextMetadataFile = getMetadataFile(ver + 1);
+ while (nextMetadataFile != null) {
ver += 1;
- metadataFile = metadataFile(ver);
+ metadataFile = nextMetadataFile;
+ nextMetadataFile = getMetadataFile(ver + 1);
}
- } catch (IOException e) {
- throw new RuntimeIOException(e, "Failed to get file system for path: %s", metadataFile);
- }
- this.version = ver;
+ this.version = ver;
- TableMetadata newMetadata = TableMetadataParser.read(this, io().newInputFile(metadataFile.toString()));
- String newUUID = newMetadata.uuid();
- if (currentMetadata != null) {
- Preconditions.checkState(newUUID == null || newUUID.equals(currentMetadata.uuid()),
- "Table UUID does not match: current=%s != refreshed=%s", currentMetadata.uuid(), newUUID);
- }
+ TableMetadata newMetadata = TableMetadataParser.read(this, io().newInputFile(metadataFile.toString()));
+ String newUUID = newMetadata.uuid();
+ if (currentMetadata != null) {
+ Preconditions.checkState(newUUID == null || newUUID.equals(currentMetadata.uuid()),
+ "Table UUID does not match: current=%s != refreshed=%s", currentMetadata.uuid(), newUUID);
+ }
- this.currentMetadata = newMetadata;
- this.shouldRefresh = false;
- return currentMetadata;
+ this.currentMetadata = newMetadata;
+ this.shouldRefresh = false;
+ return currentMetadata;
+ } catch (IOException e) {
+ throw new RuntimeIOException(e, "Failed to refresh the table");
+ }
}
@Override
@@ -124,11 +123,15 @@ public class HadoopTableOperations implements TableOperations {
!metadata.properties().containsKey(TableProperties.WRITE_METADATA_LOCATION),
"Hadoop path-based tables cannot relocate metadata");
- Path tempMetadataFile = metadataPath(UUID.randomUUID().toString() + TableMetadataParser.getFileExtension(conf));
+ String codecName = metadata.property(
+ TableProperties.METADATA_COMPRESSION, TableProperties.METADATA_COMPRESSION_DEFAULT);
+ TableMetadataParser.Codec codec = TableMetadataParser.Codec.fromName(codecName);
+ String fileExtension = TableMetadataParser.getFileExtension(codec);
+ Path tempMetadataFile = metadataPath(UUID.randomUUID().toString() + fileExtension);
TableMetadataParser.write(metadata, io().newOutputFile(tempMetadataFile.toString()));
int nextVersion = (version != null ? version : 0) + 1;
- Path finalMetadataFile = metadataFile(nextVersion);
+ Path finalMetadataFile = metadataFilePath(nextVersion, codec);
FileSystem fs = getFileSystem(tempMetadataFile, conf);
try {
@@ -168,8 +171,19 @@ public class HadoopTableOperations implements TableOperations {
return metadataPath(fileName).toString();
}
- private Path metadataFile(int metadataVersion) {
- return metadataPath("v" + metadataVersion + TableMetadataParser.getFileExtension(conf));
+ private Path getMetadataFile(int metadataVersion) throws IOException {
+ for (TableMetadataParser.Codec codec : TableMetadataParser.Codec.values()) {
+ Path metadataFile = metadataFilePath(metadataVersion, codec);
+ FileSystem fs = getFileSystem(metadataFile, conf);
+ if (fs.exists(metadataFile)) {
+ return metadataFile;
+ }
+ }
+ return null;
+ }
+
+ private Path metadataFilePath(int metadataVersion, TableMetadataParser.Codec codec) {
+ return metadataPath("v" + metadataVersion + TableMetadataParser.getFileExtension(codec));
}
private Path metadataPath(String filename) {
diff --git a/core/src/test/java/org/apache/iceberg/TableMetadataParserCodecTest.java b/core/src/test/java/org/apache/iceberg/TableMetadataParserCodecTest.java
new file mode 100644
index 0000000..0ffca02
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/TableMetadataParserCodecTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import org.apache.iceberg.TableMetadataParser.Codec;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+public class TableMetadataParserCodecTest {
+
+ @Rule
+ public ExpectedException exceptionRule = ExpectedException.none();
+
+ @Test
+ public void testCompressionCodec() {
+ Assert.assertEquals(Codec.GZIP, Codec.fromName("gzip"));
+ Assert.assertEquals(Codec.GZIP, Codec.fromName("gZiP"));
+ Assert.assertEquals(Codec.GZIP, Codec.fromFileName("v3.gz.metadata.json"));
+ Assert.assertEquals(Codec.GZIP, Codec.fromFileName("v3-f326-4b66-a541-7b1c.gz.metadata.json"));
+ Assert.assertEquals(Codec.GZIP, Codec.fromFileName("v3-f326-4b66-a541-7b1c.metadata.json.gz"));
+ Assert.assertEquals(Codec.NONE, Codec.fromName("none"));
+ Assert.assertEquals(Codec.NONE, Codec.fromName("nOnE"));
+ Assert.assertEquals(Codec.NONE, Codec.fromFileName("v3.metadata.json"));
+ Assert.assertEquals(Codec.NONE, Codec.fromFileName("v3-f326-4b66-a541-7b1c.metadata.json"));
+ }
+
+ @Test
+ public void testInvalidCodecName() {
+ exceptionRule.expect(IllegalArgumentException.class);
+ exceptionRule.expectMessage("No enum constant");
+ Codec.fromName("invalid");
+ }
+
+ @Test
+ public void testInvalidFileName() {
+ exceptionRule.expect(IllegalArgumentException.class);
+ exceptionRule.expectMessage("path/to/file.parquet is not a valid metadata file");
+ Codec.fromFileName("path/to/file.parquet");
+ }
+}
diff --git a/core/src/test/java/org/apache/iceberg/TableMetadataParserTest.java b/core/src/test/java/org/apache/iceberg/TableMetadataParserTest.java
index 7803877..8171cf9 100644
--- a/core/src/test/java/org/apache/iceberg/TableMetadataParserTest.java
+++ b/core/src/test/java/org/apache/iceberg/TableMetadataParserTest.java
@@ -19,63 +19,77 @@
package org.apache.iceberg;
-import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.Map;
import java.util.zip.GZIPInputStream;
import java.util.zip.ZipException;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.TableMetadataParser.Codec;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.types.Types.BooleanType;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
-import static org.apache.iceberg.ConfigProperties.COMPRESS_METADATA;
import static org.apache.iceberg.PartitionSpec.unpartitioned;
import static org.apache.iceberg.TableMetadata.newTableMetadata;
import static org.apache.iceberg.TableMetadataParser.getFileExtension;
import static org.apache.iceberg.types.Types.NestedField.optional;
+@RunWith(Parameterized.class)
public class TableMetadataParserTest {
- private static final Schema SCHEMA = new Schema(Lists.newArrayList(optional(1, "b", BooleanType.get())));
- private static final TableMetadata EXPECTED =
- newTableMetadata(null, SCHEMA, unpartitioned(), "file://tmp/db/table");
+ private static final Schema SCHEMA = new Schema(optional(1, "b", BooleanType.get()));
+
+ @Parameterized.Parameters
+ public static Object[][] parameters() {
+ return new Object[][] {
+ new Object[] { "none" },
+ new Object[] { "gzip" }
+ };
+ }
+
+ private final String codecName;
+
+ public TableMetadataParserTest(String codecName) {
+ this.codecName = codecName;
+ }
@Test
public void testCompressionProperty() throws IOException {
- final boolean[] props = {true, false};
- final Configuration configuration = new Configuration();
- for (boolean prop : props) {
- configuration.setBoolean(COMPRESS_METADATA, prop);
- final OutputFile outputFile = Files.localOutput(getFileExtension(configuration));
- TableMetadataParser.write(EXPECTED, outputFile);
- Assert.assertEquals(prop, isCompressed(getFileExtension(configuration)));
- final TableMetadata read = TableMetadataParser.read(
- null, Files.localInput(new File(getFileExtension(configuration))));
- verifyMetadata(read);
- }
+ Codec codec = Codec.fromName(codecName);
+ String fileExtension = getFileExtension(codec);
+ String fileName = "v3" + fileExtension;
+ OutputFile outputFile = Files.localOutput(fileName);
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(TableProperties.METADATA_COMPRESSION, codecName);
+ String location = "file://tmp/db/table";
+ TableMetadata metadata = newTableMetadata(null, SCHEMA, unpartitioned(), location, properties);
+ TableMetadataParser.write(metadata, outputFile);
+ Assert.assertEquals(codec == Codec.GZIP, isCompressed(fileName));
+ TableMetadata actualMetadata = TableMetadataParser.read(null, Files.localInput(new File(fileName)));
+ verifyMetadata(metadata, actualMetadata);
}
@After
public void cleanup() throws IOException {
- final boolean[] props = {true, false};
- Configuration configuration = new Configuration();
- for (boolean prop : props) {
- configuration.setBoolean(COMPRESS_METADATA, prop);
- java.nio.file.Files.deleteIfExists(Paths.get(getFileExtension(configuration)));
- }
+ Codec codec = Codec.fromName(codecName);
+ Path metadataFilePath = Paths.get("v3" + getFileExtension(codec));
+ java.nio.file.Files.deleteIfExists(metadataFilePath);
}
- private void verifyMetadata(TableMetadata read) {
- Assert.assertEquals(EXPECTED.schema().asStruct(), read.schema().asStruct());
- Assert.assertEquals(EXPECTED.location(), read.location());
- Assert.assertEquals(EXPECTED.lastColumnId(), read.lastColumnId());
- Assert.assertEquals(EXPECTED.properties(), read.properties());
+ private void verifyMetadata(TableMetadata expected, TableMetadata actual) {
+ Assert.assertEquals(expected.schema().asStruct(), actual.schema().asStruct());
+ Assert.assertEquals(expected.location(), actual.location());
+ Assert.assertEquals(expected.lastColumnId(), actual.lastColumnId());
+ Assert.assertEquals(expected.properties(), actual.properties());
}
private boolean isCompressed(String path) throws IOException {
diff --git a/core/src/test/java/org/apache/iceberg/hadoop/HadoopTableTestBase.java b/core/src/test/java/org/apache/iceberg/hadoop/HadoopTableTestBase.java
index 2a8c2f5..f771df6 100644
--- a/core/src/test/java/org/apache/iceberg/hadoop/HadoopTableTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/hadoop/HadoopTableTestBase.java
@@ -125,7 +125,7 @@ public class HadoopTableTestBase {
}
File version(int versionNumber) {
- return new File(metadataDir, "v" + versionNumber + getFileExtension(new Configuration()));
+ return new File(metadataDir, "v" + versionNumber + getFileExtension(TableMetadataParser.Codec.NONE));
}
TableMetadata readMetadataVersion(int version) {
diff --git a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCommits.java b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCommits.java
index 99da3d5..1c56123 100644
--- a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCommits.java
+++ b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCommits.java
@@ -312,7 +312,7 @@ public class TestHadoopCommits extends HadoopTableTestBase {
@Test
public void testRenameReturnFalse() throws Exception {
FileSystem mockFs = mock(FileSystem.class);
- when(mockFs.exists(any())).thenReturn(false);
+ when(mockFs.exists(any())).thenReturn(true, false);
when(mockFs.rename(any(), any())).thenReturn(false);
testRenameWithFileSystem(mockFs);
}
@@ -320,7 +320,7 @@ public class TestHadoopCommits extends HadoopTableTestBase {
@Test
public void testRenameThrow() throws Exception {
FileSystem mockFs = mock(FileSystem.class);
- when(mockFs.exists(any())).thenReturn(false);
+ when(mockFs.exists(any())).thenReturn(true, false);
when(mockFs.rename(any(), any())).thenThrow(new IOException("test injected"));
testRenameWithFileSystem(mockFs);
}
diff --git a/hive/src/test/java/org/apache/iceberg/hive/HiveTableBaseTest.java b/hive/src/test/java/org/apache/iceberg/hive/HiveTableBaseTest.java
index 305ca6b..13073ff 100644
--- a/hive/src/test/java/org/apache/iceberg/hive/HiveTableBaseTest.java
+++ b/hive/src/test/java/org/apache/iceberg/hive/HiveTableBaseTest.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.types.Types;
import org.junit.After;
@@ -122,7 +123,7 @@ public class HiveTableBaseTest {
}
protected static List<String> metadataVersionFiles(String tableName) {
- return filterByExtension(tableName, getFileExtension(hiveConf));
+ return filterByExtension(tableName, getFileExtension(TableMetadataParser.Codec.NONE));
}
protected static List<String> manifestFiles(String tableName) {
diff --git a/site/docs/configuration.md b/site/docs/configuration.md
index 5c2309b..ccce102 100644
--- a/site/docs/configuration.md
+++ b/site/docs/configuration.md
@@ -22,6 +22,7 @@ Iceberg tables support table properties to configure table behavior, like the de
| write.parquet.dict-size-bytes | 2097152 (2 MB) | Parquet dictionary page size |
| write.parquet.compression-codec | gzip | Parquet compression codec |
| write.avro.compression-codec | gzip | Avro compression codec |
+| write.metadata.compression-codec | none | Metadata compression codec; none or gzip |
### Table behavior properties
diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceFlatDataBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceFlatDataBenchmark.java
index 72346a7..9f9a070 100644
--- a/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceFlatDataBenchmark.java
+++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceFlatDataBenchmark.java
@@ -20,11 +20,12 @@
package org.apache.iceberg.spark.source;
import com.google.common.collect.Maps;
+import java.util.Map;
import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.ConfigProperties;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.types.Types;
@@ -35,9 +36,7 @@ public abstract class IcebergSourceFlatDataBenchmark extends IcebergSourceBenchm
@Override
protected Configuration initHadoopConf() {
- Configuration conf = new Configuration();
- conf.set(ConfigProperties.COMPRESS_METADATA, "true");
- return conf;
+ return new Configuration();
}
@Override
@@ -53,6 +52,8 @@ public abstract class IcebergSourceFlatDataBenchmark extends IcebergSourceBenchm
optional(8, "stringCol", Types.StringType.get()));
PartitionSpec partitionSpec = PartitionSpec.unpartitioned();
HadoopTables tables = new HadoopTables(hadoopConf());
- return tables.create(schema, partitionSpec, Maps.newHashMap(), newTableLocation());
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(TableProperties.METADATA_COMPRESSION, "gzip");
+ return tables.create(schema, partitionSpec, properties, newTableLocation());
}
}
diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceNestedDataBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceNestedDataBenchmark.java
index bc54257..16468a1 100644
--- a/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceNestedDataBenchmark.java
+++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceNestedDataBenchmark.java
@@ -20,11 +20,12 @@
package org.apache.iceberg.spark.source;
import com.google.common.collect.Maps;
+import java.util.Map;
import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.ConfigProperties;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.types.Types;
@@ -35,9 +36,7 @@ public abstract class IcebergSourceNestedDataBenchmark extends IcebergSourceBenc
@Override
protected Configuration initHadoopConf() {
- Configuration conf = new Configuration();
- conf.set(ConfigProperties.COMPRESS_METADATA, "true");
- return conf;
+ return new Configuration();
}
@Override
@@ -52,6 +51,8 @@ public abstract class IcebergSourceNestedDataBenchmark extends IcebergSourceBenc
);
PartitionSpec partitionSpec = PartitionSpec.unpartitioned();
HadoopTables tables = new HadoopTables(hadoopConf());
- return tables.create(schema, partitionSpec, Maps.newHashMap(), newTableLocation());
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(TableProperties.METADATA_COMPRESSION, "gzip");
+ return tables.create(schema, partitionSpec, properties, newTableLocation());
}
}
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
index ee21902..cbb5351 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
@@ -25,7 +25,6 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.ConfigProperties;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionSpec;
@@ -134,34 +133,41 @@ public class TestDataSourceOptions {
@Test
public void testHadoopOptions() throws IOException {
String tableLocation = temp.newFolder("iceberg-table").toString();
-
- Configuration customHadoopConf = new Configuration(CONF);
- customHadoopConf.set(ConfigProperties.COMPRESS_METADATA, "true");
-
- HadoopTables tables = new HadoopTables(customHadoopConf);
- PartitionSpec spec = PartitionSpec.unpartitioned();
- Map<String, String> options = Maps.newHashMap();
- tables.create(SCHEMA, spec, options, tableLocation);
-
- List<SimpleRecord> expectedRecords = Lists.newArrayList(
- new SimpleRecord(1, "a"),
- new SimpleRecord(2, "b")
- );
- Dataset<Row> originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class);
- originalDf.select("id", "data").write()
- .format("iceberg")
- .mode("append")
- .option("hadoop.iceberg.compress.metadata", "true")
- .save(tableLocation);
-
- Dataset<Row> resultDf = spark.read()
- .format("iceberg")
- .option("hadoop.iceberg.compress.metadata", "true")
- .load(tableLocation);
- List<SimpleRecord> resultRecords = resultDf.orderBy("id")
- .as(Encoders.bean(SimpleRecord.class))
- .collectAsList();
-
- Assert.assertEquals("Records should match", expectedRecords, resultRecords);
+ Configuration sparkHadoopConf = spark.sparkContext().hadoopConfiguration();
+ String originalDefaultFS = sparkHadoopConf.get("fs.default.name");
+
+ try {
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.unpartitioned();
+ Map<String, String> options = Maps.newHashMap();
+ tables.create(SCHEMA, spec, options, tableLocation);
+
+ // set an invalid value for 'fs.default.name' in Spark Hadoop config
+ // to verify that 'hadoop.' data source options are propagated correctly
+ sparkHadoopConf.set("fs.default.name", "hdfs://localhost:9000");
+
+ List<SimpleRecord> expectedRecords = Lists.newArrayList(
+ new SimpleRecord(1, "a"),
+ new SimpleRecord(2, "b")
+ );
+ Dataset<Row> originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class);
+ originalDf.select("id", "data").write()
+ .format("iceberg")
+ .mode("append")
+ .option("hadoop.fs.default.name", "file:///")
+ .save(tableLocation);
+
+ Dataset<Row> resultDf = spark.read()
+ .format("iceberg")
+ .option("hadoop.fs.default.name", "file:///")
+ .load(tableLocation);
+ List<SimpleRecord> resultRecords = resultDf.orderBy("id")
+ .as(Encoders.bean(SimpleRecord.class))
+ .collectAsList();
+
+ Assert.assertEquals("Records should match", expectedRecords, resultRecords);
+ } finally {
+ sparkHadoopConf.set("fs.default.name", originalDefaultFS);
+ }
}
}