You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2019/10/01 07:48:33 UTC
[incubator-iceberg] branch master updated: Spark: Remove flaky Hive
table tests (#509)
This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new c797060 Spark: Remove flaky Hive table tests (#509)
c797060 is described below
commit c79706052a2ec3f261efc621214792495312154d
Author: Ryan Blue <rd...@users.noreply.github.com>
AuthorDate: Tue Oct 1 00:48:28 2019 -0700
Spark: Remove flaky Hive table tests (#509)
---
.../spark/source/TestIcebergSourceHiveTables.java | 386 +--------------------
1 file changed, 6 insertions(+), 380 deletions(-)
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java
index 2a8bf79..7b95659 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java
@@ -19,30 +19,17 @@
package org.apache.iceberg.spark.source;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import java.util.HashMap;
import java.util.List;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.iceberg.DataFile;
-import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.avro.Avro;
-import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.iceberg.hive.HiveClientPool;
import org.apache.iceberg.hive.TestHiveMetastore;
-import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.io.InputFile;
-import org.apache.iceberg.spark.data.TestHelpers;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
@@ -104,7 +91,7 @@ public class TestIcebergSourceHiveTables {
}
@Test
- public synchronized void testHiveTablesSupport() throws Exception {
+ public void testHiveTablesSupport() throws Exception {
TableIdentifier tableIdentifier = TableIdentifier.of("db", "table");
try {
catalog.createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());
@@ -128,375 +115,14 @@ public class TestIcebergSourceHiveTables {
.collectAsList();
Assert.assertEquals("Records should match", expectedRecords, actualRecords);
- } finally {
- clients.run(client -> {
- client.dropTable(tableIdentifier.namespace().level(0), tableIdentifier.name());
- return null;
- });
- }
- }
-
- @Test
- public synchronized void testHiveEntriesTable() throws Exception {
- TableIdentifier tableIdentifier = TableIdentifier.of("db", "entries_test");
- try {
- Table table = catalog.createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());
- Table entriesTable = catalog.loadTable(TableIdentifier.of("db", "entries_test", "entries"));
-
- List<SimpleRecord> records = Lists.newArrayList(new SimpleRecord(1, "1"));
-
- Dataset<Row> inputDf = spark.createDataFrame(records, SimpleRecord.class);
- inputDf.select("id", "data").write()
- .format("iceberg")
- .mode("append")
- .save(tableIdentifier.toString());
-
- table.refresh();
-
- List<Row> actual = spark.read()
- .format("iceberg")
- .load("db.entries_test.entries")
- .collectAsList();
- Assert.assertEquals("Should only contain one manifest", 1, table.currentSnapshot().manifests().size());
- InputFile manifest = table.io().newInputFile(table.currentSnapshot().manifests().get(0).path());
- List<GenericData.Record> expected;
- try (CloseableIterable<GenericData.Record> rows = Avro.read(manifest).project(entriesTable.schema()).build()) {
- expected = Lists.newArrayList(rows);
+ // test metadata table name resolution
+ for (String metadataName : new String[] {"history", "snapshots", "files", "entries", "manifests"}) {
+ String metadataTableName = tableIdentifier.toString() + "." + metadataName;
+ Dataset<Row> df = spark.read().format("iceberg").load(metadataTableName);
+ Assert.assertNotNull("Should resolve metadata table: " + metadataName, df);
}
- Assert.assertEquals("Entries table should have one row", 1, expected.size());
- Assert.assertEquals("Actual results should have one row", 1, actual.size());
- TestHelpers.assertEqualsSafe(entriesTable.schema().asStruct(), expected.get(0), actual.get(0));
-
- } finally {
- clients.run(client -> {
- client.dropTable(tableIdentifier.namespace().level(0), tableIdentifier.name());
- return null;
- });
- }
- }
-
- @Test
- public synchronized void testHiveFilesTable() throws Exception {
- TableIdentifier tableIdentifier = TableIdentifier.of("db", "files_test");
- try {
- Table table = catalog.createTable(tableIdentifier, SCHEMA,
- PartitionSpec.builderFor(SCHEMA).identity("id").build());
- Table entriesTable = catalog.loadTable(TableIdentifier.of("db", "files_test", "entries"));
- Table filesTable = catalog.loadTable(TableIdentifier.of("db", "files_test", "files"));
-
- Dataset<Row> df1 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class);
- Dataset<Row> df2 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(2, "b")), SimpleRecord.class);
-
- df1.select("id", "data").write()
- .format("iceberg")
- .mode("append")
- .save(tableIdentifier.toString());
-
- // add a second file
- df2.select("id", "data").write()
- .format("iceberg")
- .mode("append")
- .save(tableIdentifier.toString());
-
- // delete the first file to test that only live files are listed
- table.newDelete().deleteFromRowFilter(Expressions.equal("id", 1)).commit();
-
- List<Row> actual = spark.read()
- .format("iceberg")
- .load("db.files_test.files")
- .collectAsList();
-
- List<GenericData.Record> expected = Lists.newArrayList();
- for (ManifestFile manifest : table.currentSnapshot().manifests()) {
- InputFile in = table.io().newInputFile(manifest.path());
- try (CloseableIterable<GenericData.Record> rows = Avro.read(in).project(entriesTable.schema()).build()) {
- for (GenericData.Record record : rows) {
- if ((Integer) record.get("status") < 2 /* added or existing */) {
- expected.add((GenericData.Record) record.get("data_file"));
- }
- }
- }
- }
-
- Assert.assertEquals("Files table should have one row", 1, expected.size());
- Assert.assertEquals("Actual results should have one row", 1, actual.size());
- TestHelpers.assertEqualsSafe(filesTable.schema().asStruct(), expected.get(0), actual.get(0));
-
- } finally {
- clients.run(client -> {
- client.dropTable(tableIdentifier.namespace().level(0), tableIdentifier.name());
- return null;
- });
- }
- }
-
- @Test
- public synchronized void testHiveFilesUnpartitionedTable() throws Exception {
- TableIdentifier tableIdentifier = TableIdentifier.of("db", "unpartitioned_files_test");
- try {
- Table table = catalog.createTable(tableIdentifier, SCHEMA);
- Table entriesTable = catalog.loadTable(TableIdentifier.of("db", "unpartitioned_files_test", "entries"));
- Table filesTable = catalog.loadTable(TableIdentifier.of("db", "unpartitioned_files_test", "files"));
-
- Dataset<Row> df1 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class);
- Dataset<Row> df2 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(2, "b")), SimpleRecord.class);
-
- df1.select("id", "data").write()
- .format("iceberg")
- .mode("append")
- .save(tableIdentifier.toString());
-
- table.refresh();
- DataFile toDelete = Iterables.getOnlyElement(table.currentSnapshot().addedFiles());
-
- // add a second file
- df2.select("id", "data").write()
- .format("iceberg")
- .mode("append")
- .save(tableIdentifier.toString());
-
- // delete the first file to test that only live files are listed
- table.newDelete().deleteFile(toDelete).commit();
-
- List<Row> actual = spark.read()
- .format("iceberg")
- .load("db.unpartitioned_files_test.files")
- .collectAsList();
-
- List<GenericData.Record> expected = Lists.newArrayList();
- for (ManifestFile manifest : table.currentSnapshot().manifests()) {
- InputFile in = table.io().newInputFile(manifest.path());
- try (CloseableIterable<GenericData.Record> rows = Avro.read(in).project(entriesTable.schema()).build()) {
- for (GenericData.Record record : rows) {
- if ((Integer) record.get("status") < 2 /* added or existing */) {
- expected.add((GenericData.Record) record.get("data_file"));
- }
- }
- }
- }
-
- Assert.assertEquals("Files table should have one row", 1, expected.size());
- Assert.assertEquals("Actual results should have one row", 1, actual.size());
- TestHelpers.assertEqualsSafe(filesTable.schema().asStruct(), expected.get(0), actual.get(0));
-
- } finally {
- clients.run(client -> {
- client.dropTable(tableIdentifier.namespace().level(0), tableIdentifier.name());
- return null;
- });
- }
- }
-
- @Test
- public synchronized void testHiveHistoryTable() throws Exception {
- TableIdentifier tableIdentifier = TableIdentifier.of("db", "history_test");
- try {
- Table table = catalog.createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());
- Table historyTable = catalog.loadTable(TableIdentifier.of("db", "history_test", "history"));
-
- List<SimpleRecord> records = Lists.newArrayList(new SimpleRecord(1, "1"));
- Dataset<Row> inputDf = spark.createDataFrame(records, SimpleRecord.class);
-
- inputDf.select("id", "data").write()
- .format("iceberg")
- .mode("append")
- .save(tableIdentifier.toString());
-
- table.refresh();
- long firstSnapshotTimestamp = table.currentSnapshot().timestampMillis();
- long firstSnapshotId = table.currentSnapshot().snapshotId();
-
- inputDf.select("id", "data").write()
- .format("iceberg")
- .mode("append")
- .save(tableIdentifier.toString());
-
- table.refresh();
- long secondSnapshotTimestamp = table.currentSnapshot().timestampMillis();
- long secondSnapshotId = table.currentSnapshot().snapshotId();
-
- // rollback the table state to the first snapshot
- table.rollback().toSnapshotId(firstSnapshotId).commit();
- long rollbackTimestamp = Iterables.getLast(table.history()).timestampMillis();
-
- inputDf.select("id", "data").write()
- .format("iceberg")
- .mode("append")
- .save(tableIdentifier.toString());
-
- table.refresh();
- long thirdSnapshotTimestamp = table.currentSnapshot().timestampMillis();
- long thirdSnapshotId = table.currentSnapshot().snapshotId();
-
- List<Row> actual = spark.read()
- .format("iceberg")
- .load("db.history_test.history")
- .collectAsList();
-
- GenericRecordBuilder builder = new GenericRecordBuilder(AvroSchemaUtil.convert(historyTable.schema(), "history"));
- List<GenericData.Record> expected = Lists.newArrayList(
- builder.set("made_current_at", firstSnapshotTimestamp * 1000)
- .set("snapshot_id", firstSnapshotId)
- .set("parent_id", null)
- .set("is_current_ancestor", true)
- .build(),
- builder.set("made_current_at", secondSnapshotTimestamp * 1000)
- .set("snapshot_id", secondSnapshotId)
- .set("parent_id", firstSnapshotId)
- .set("is_current_ancestor", false) // commit rolled back, not an ancestor of the current table state
- .build(),
- builder.set("made_current_at", rollbackTimestamp * 1000)
- .set("snapshot_id", firstSnapshotId)
- .set("parent_id", null)
- .set("is_current_ancestor", true)
- .build(),
- builder.set("made_current_at", thirdSnapshotTimestamp * 1000)
- .set("snapshot_id", thirdSnapshotId)
- .set("parent_id", firstSnapshotId)
- .set("is_current_ancestor", true)
- .build()
- );
-
- Assert.assertEquals("History table should have a row for each commit", 4, actual.size());
- TestHelpers.assertEqualsSafe(historyTable.schema().asStruct(), expected.get(0), actual.get(0));
- TestHelpers.assertEqualsSafe(historyTable.schema().asStruct(), expected.get(1), actual.get(1));
- TestHelpers.assertEqualsSafe(historyTable.schema().asStruct(), expected.get(2), actual.get(2));
-
- } finally {
- clients.run(client -> {
- client.dropTable(tableIdentifier.namespace().level(0), tableIdentifier.name());
- return null;
- });
- }
- }
-
- @Test
- public synchronized void testHiveSnapshotsTable() throws Exception {
- TableIdentifier tableIdentifier = TableIdentifier.of("db", "snapshots_test");
- try {
- Table table = catalog.createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());
- Table snapTable = catalog.loadTable(TableIdentifier.of("db", "snapshots_test", "snapshots"));
-
- List<SimpleRecord> records = Lists.newArrayList(new SimpleRecord(1, "1"));
- Dataset<Row> inputDf = spark.createDataFrame(records, SimpleRecord.class);
-
- inputDf.select("id", "data").write()
- .format("iceberg")
- .mode("append")
- .save(tableIdentifier.toString());
-
- table.refresh();
- long firstSnapshotTimestamp = table.currentSnapshot().timestampMillis();
- long firstSnapshotId = table.currentSnapshot().snapshotId();
- String firstManifestList = table.currentSnapshot().manifestListLocation();
-
- table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit();
-
- long secondSnapshotTimestamp = table.currentSnapshot().timestampMillis();
- long secondSnapshotId = table.currentSnapshot().snapshotId();
- String secondManifestList = table.currentSnapshot().manifestListLocation();
-
- // rollback the table state to the first snapshot
- table.rollback().toSnapshotId(firstSnapshotId).commit();
-
- List<Row> actual = spark.read()
- .format("iceberg")
- .load("db.snapshots_test.snapshots")
- .collectAsList();
-
- GenericRecordBuilder builder = new GenericRecordBuilder(AvroSchemaUtil.convert(snapTable.schema(), "snapshots"));
- List<GenericData.Record> expected = Lists.newArrayList(
- builder.set("committed_at", firstSnapshotTimestamp * 1000)
- .set("snapshot_id", firstSnapshotId)
- .set("parent_id", null)
- .set("operation", "append")
- .set("manifest_list", firstManifestList)
- .set("summary", ImmutableMap.of(
- "added-records", "1",
- "added-data-files", "1",
- "changed-partition-count", "1",
- "total-data-files", "1",
- "total-records", "1"
- ))
- .build(),
- builder.set("committed_at", secondSnapshotTimestamp * 1000)
- .set("snapshot_id", secondSnapshotId)
- .set("parent_id", firstSnapshotId)
- .set("operation", "delete")
- .set("manifest_list", secondManifestList)
- .set("summary", ImmutableMap.of(
- "deleted-records", "1",
- "deleted-data-files", "1",
- "changed-partition-count", "1",
- "total-records", "0",
- "total-data-files", "0"
- ))
- .build()
- );
-
- Assert.assertEquals("Snapshots table should have a row for each snapshot", 2, actual.size());
- TestHelpers.assertEqualsSafe(snapTable.schema().asStruct(), expected.get(0), actual.get(0));
- TestHelpers.assertEqualsSafe(snapTable.schema().asStruct(), expected.get(1), actual.get(1));
-
- } finally {
- clients.run(client -> {
- client.dropTable(tableIdentifier.namespace().level(0), tableIdentifier.name());
- return null;
- });
- }
- }
-
- @Test
- public synchronized void testHiveManifestsTable() throws Exception {
- TableIdentifier tableIdentifier = TableIdentifier.of("db", "manifests_test");
- try {
- Table table = catalog.createTable(
- tableIdentifier,
- SCHEMA,
- PartitionSpec.builderFor(SCHEMA).identity("id").build());
- Table manifestTable = catalog.loadTable(TableIdentifier.of("db", "manifests_test", "manifests"));
-
- Dataset<Row> df1 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class);
-
- df1.select("id", "data").write()
- .format("iceberg")
- .mode("append")
- .save(tableIdentifier.toString());
-
- List<Row> actual = spark.read()
- .format("iceberg")
- .load("db.manifests_test.manifests")
- .collectAsList();
-
- table.refresh();
-
- GenericRecordBuilder builder = new GenericRecordBuilder(AvroSchemaUtil.convert(
- manifestTable.schema(), "manifests"));
- GenericRecordBuilder summaryBuilder = new GenericRecordBuilder(AvroSchemaUtil.convert(
- manifestTable.schema().findType("partition_summaries.element").asStructType(), "partition_summary"));
- List<GenericData.Record> expected = Lists.transform(table.currentSnapshot().manifests(), manifest ->
- builder.set("path", manifest.path())
- .set("length", manifest.length())
- .set("partition_spec_id", manifest.partitionSpecId())
- .set("added_snapshot_id", manifest.snapshotId())
- .set("added_data_files_count", manifest.addedFilesCount())
- .set("existing_data_files_count", manifest.existingFilesCount())
- .set("deleted_data_files_count", manifest.deletedFilesCount())
- .set("partition_summaries", Lists.transform(manifest.partitions(), partition ->
- summaryBuilder
- .set("contains_null", false)
- .set("lower_bound", "1")
- .set("upper_bound", "1")
- .build()
- ))
- .build()
- );
-
- Assert.assertEquals("Manifests table should have one manifest row", 1, actual.size());
- TestHelpers.assertEqualsSafe(manifestTable.schema().asStruct(), expected.get(0), actual.get(0));
-
} finally {
clients.run(client -> {
client.dropTable(tableIdentifier.namespace().level(0), tableIdentifier.name());