You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2020/03/06 19:38:43 UTC

[incubator-iceberg] branch master updated: Spark: Update table tests to use common test cases (#827)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 5481125  Spark: Update table tests to use common test cases (#827)
5481125 is described below

commit 548112517032897f02f80cc564ecc5b1f82918de
Author: Jacob Lammott <ja...@gmail.com>
AuthorDate: Fri Mar 6 11:38:06 2020 -0800

    Spark: Update table tests to use common test cases (#827)
    
    Co-authored-by: Lammott <jl...@usa483e72e3960.am.sony.com>
---
 .../source/TestIcebergSourceHadoopTables.java      | 629 +----------------
 .../spark/source/TestIcebergSourceHiveTables.java  | 771 +--------------------
 ...ables.java => TestIcebergSourceTablesBase.java} | 199 +++---
 3 files changed, 142 insertions(+), 1457 deletions(-)

diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHadoopTables.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHadoopTables.java
index 9be496e..e1674a1 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHadoopTables.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHadoopTables.java
@@ -19,53 +19,22 @@
 
 package org.apache.iceberg.spark.source;
 
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
 import java.io.File;
-import java.util.Comparator;
-import java.util.List;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecordBuilder;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.DataFile;
-import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
-import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.avro.Avro;
-import org.apache.iceberg.avro.AvroSchemaUtil;
-import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.hadoop.HadoopTables;
-import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.io.InputFile;
-import org.apache.iceberg.spark.SparkTableUtil;
-import org.apache.iceberg.spark.data.TestHelpers;
-import org.apache.iceberg.types.Types;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.catalyst.TableIdentifier;
 import org.junit.AfterClass;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Rule;
-import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
-import static org.apache.iceberg.types.Types.NestedField.optional;
+public class TestIcebergSourceHadoopTables extends TestIcebergSourceTablesBase {
 
-public class TestIcebergSourceHadoopTables {
-
-  private static final Schema SCHEMA = new Schema(
-      optional(1, "id", Types.IntegerType.get()),
-      optional(2, "data", Types.StringType.get())
-  );
-
-  private static SparkSession spark;
   private static final HadoopTables TABLES = new HadoopTables(new Configuration());
 
   @BeforeClass
@@ -95,592 +64,26 @@ public class TestIcebergSourceHadoopTables {
     this.tableLocation = tableDir.toURI().toString();
   }
 
-  @Test
-  public void testEntriesTable() throws Exception {
-    Table table = TABLES.create(SCHEMA, tableLocation);
-    System.out.println(tableLocation);
-    Table entriesTable = TABLES.load(tableLocation + "#entries");
-
-    List<SimpleRecord> records = Lists.newArrayList(new SimpleRecord(1, "1"));
-
-    Dataset<Row> inputDf = spark.createDataFrame(records, SimpleRecord.class);
-    inputDf.select("id", "data").write()
-        .format("iceberg")
-        .mode("append")
-        .save(tableLocation);
-
-    table.refresh();
-
-    List<Row> actual = spark.read()
-        .format("iceberg")
-        .load(tableLocation + "#entries")
-        .collectAsList();
-
-    Assert.assertEquals("Should only contain one manifest", 1, table.currentSnapshot().manifests().size());
-    InputFile manifest = table.io().newInputFile(table.currentSnapshot().manifests().get(0).path());
-    List<GenericData.Record> expected;
-    try (CloseableIterable<GenericData.Record> rows = Avro.read(manifest).project(entriesTable.schema()).build()) {
-      expected = Lists.newArrayList(rows);
-    }
-
-    Assert.assertEquals("Entries table should have one row", 1, expected.size());
-    Assert.assertEquals("Actual results should have one row", 1, actual.size());
-    TestHelpers.assertEqualsSafe(entriesTable.schema().asStruct(), expected.get(0), actual.get(0));
-  }
-
-  @Test
-  public void testAllEntriesTable() throws Exception {
-    Table table = TABLES.create(SCHEMA, tableLocation);
-    System.out.println(tableLocation);
-    Table entriesTable = TABLES.load(tableLocation + "#all_entries");
-
-    Dataset<Row> df1 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class);
-    Dataset<Row> df2 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(2, "b")), SimpleRecord.class);
-
-    df1.select("id", "data").write()
-        .format("iceberg")
-        .mode("append")
-        .save(tableLocation);
-
-    // delete the first file to test that not only live files are listed
-    table.newDelete().deleteFromRowFilter(Expressions.equal("id", 1)).commit();
-
-    // add a second file
-    df2.select("id", "data").write()
-        .format("iceberg")
-        .mode("append")
-        .save(tableLocation);
-
-    // ensure table data isn't stale
-    table.refresh();
-
-    List<Row> actual = spark.read()
-        .format("iceberg")
-        .load(tableLocation + "#all_entries")
-        .orderBy("snapshot_id")
-        .collectAsList();
-
-    List<GenericData.Record> expected = Lists.newArrayList();
-    for (ManifestFile manifest : Iterables.concat(Iterables.transform(table.snapshots(), Snapshot::manifests))) {
-      InputFile in = table.io().newInputFile(manifest.path());
-      try (CloseableIterable<GenericData.Record> rows = Avro.read(in).project(entriesTable.schema()).build()) {
-        for (GenericData.Record record : rows) {
-          expected.add(record);
-        }
-      }
-    }
-
-    expected.sort(Comparator.comparing(o -> (Long) o.get("snapshot_id")));
-
-    Assert.assertEquals("Entries table should have 3 rows", 3, expected.size());
-    Assert.assertEquals("Actual results should have 3 rows", 3, actual.size());
-    for (int i = 0; i < expected.size(); i += 1) {
-      TestHelpers.assertEqualsSafe(entriesTable.schema().asStruct(), expected.get(i), actual.get(i));
-    }
-  }
-
-  @Test
-  public void testFilesTable() throws Exception {
-    Table table = TABLES.create(SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("id").build(), tableLocation);
-    Table entriesTable = TABLES.load(tableLocation + "#entries");
-    Table filesTable = TABLES.load(tableLocation + "#files");
-
-    Dataset<Row> df1 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class);
-    Dataset<Row> df2 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(2, "b")), SimpleRecord.class);
-
-    df1.select("id", "data").write()
-        .format("iceberg")
-        .mode("append")
-        .save(tableLocation);
-
-    // add a second file
-    df2.select("id", "data").write()
-        .format("iceberg")
-        .mode("append")
-        .save(tableLocation);
-
-    // delete the first file to test that only live files are listed
-    table.newDelete().deleteFromRowFilter(Expressions.equal("id", 1)).commit();
-
-    List<Row> actual = spark.read()
-        .format("iceberg")
-        .load(tableLocation + "#files")
-        .collectAsList();
-
-    List<GenericData.Record> expected = Lists.newArrayList();
-    for (ManifestFile manifest : table.currentSnapshot().manifests()) {
-      InputFile in = table.io().newInputFile(manifest.path());
-      try (CloseableIterable<GenericData.Record> rows = Avro.read(in).project(entriesTable.schema()).build()) {
-        for (GenericData.Record record : rows) {
-          if ((Integer) record.get("status") < 2 /* added or existing */) {
-            expected.add((GenericData.Record) record.get("data_file"));
-          }
-        }
-      }
+  @Override
+  public Table createTable(TableIdentifier ident, Schema schema, PartitionSpec spec) {
+    if (spec.equals(PartitionSpec.unpartitioned())) {
+      return TABLES.create(schema, tableLocation);
     }
-
-    Assert.assertEquals("Files table should have one row", 1, expected.size());
-    Assert.assertEquals("Actual results should have one row", 1, actual.size());
-    TestHelpers.assertEqualsSafe(filesTable.schema().asStruct(), expected.get(0), actual.get(0));
+    return TABLES.create(schema, spec, tableLocation);
   }
 
-  @Test
-  public void testFilesTableWithSnapshotIdInheritance() throws Exception {
-    Table table = TABLES.create(SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("id").build(), tableLocation);
-
-    table.updateProperties()
-        .set(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, "true")
-        .commit();
-
-    Table entriesTable = TABLES.load(tableLocation + "#entries");
-    Table filesTable = TABLES.load(tableLocation + "#files");
-
-    List<SimpleRecord> records = Lists.newArrayList(
-        new SimpleRecord(1, "a"),
-        new SimpleRecord(2, "b")
-     );
-
-    try {
-      Dataset<Row> inputDF = spark.createDataFrame(records, SimpleRecord.class);
-      inputDF.select("id", "data").write()
-          .format("parquet")
-          .mode("append")
-          .partitionBy("id")
-          .saveAsTable("parquet_table");
-
-      String stagingLocation = table.location() + "/metadata";
-      SparkTableUtil.importSparkTable(spark, new TableIdentifier("parquet_table"), table, stagingLocation);
-
-      List<Row> actual = spark.read()
-          .format("iceberg")
-          .load(tableLocation + "#files")
-          .collectAsList();
-
-      List<GenericData.Record> expected = Lists.newArrayList();
-      for (ManifestFile manifest : table.currentSnapshot().manifests()) {
-        InputFile in = table.io().newInputFile(manifest.path());
-        try (CloseableIterable<GenericData.Record> rows = Avro.read(in).project(entriesTable.schema()).build()) {
-          for (GenericData.Record record : rows) {
-            expected.add((GenericData.Record) record.get("data_file"));
-          }
-        }
-      }
-
-      Assert.assertEquals("Files table should have one row", 2, expected.size());
-      Assert.assertEquals("Actual results should have one row", 2, actual.size());
-      TestHelpers.assertEqualsSafe(filesTable.schema().asStruct(), expected.get(0), actual.get(0));
-      TestHelpers.assertEqualsSafe(filesTable.schema().asStruct(), expected.get(1), actual.get(1));
-
-    } finally {
-      spark.sql("DROP TABLE parquet_table");
-    }
+  @Override
+  public Table loadTable(TableIdentifier ident, String entriesSuffix) {
+    return TABLES.load(loadLocation(ident, entriesSuffix));
   }
 
-  @Test
-  public void testFilesUnpartitionedTable() throws Exception {
-    Table table = TABLES.create(SCHEMA, tableLocation);
-    Table entriesTable = TABLES.load(tableLocation + "#entries");
-    Table filesTable = TABLES.load(tableLocation + "#files");
-
-    Dataset<Row> df1 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class);
-    Dataset<Row> df2 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(2, "b")), SimpleRecord.class);
-
-    df1.select("id", "data").write()
-        .format("iceberg")
-        .mode("append")
-        .save(tableLocation);
-
-    table.refresh();
-    DataFile toDelete = Iterables.getOnlyElement(table.currentSnapshot().addedFiles());
-
-    // add a second file
-    df2.select("id", "data").write()
-        .format("iceberg")
-        .mode("append")
-        .save(tableLocation);
-
-    // delete the first file to test that only live files are listed
-    table.newDelete().deleteFile(toDelete).commit();
-
-    List<Row> actual = spark.read()
-        .format("iceberg")
-        .load(tableLocation + "#files")
-        .collectAsList();
-
-    List<GenericData.Record> expected = Lists.newArrayList();
-    for (ManifestFile manifest : table.currentSnapshot().manifests()) {
-      InputFile in = table.io().newInputFile(manifest.path());
-      try (CloseableIterable<GenericData.Record> rows = Avro.read(in).project(entriesTable.schema()).build()) {
-        for (GenericData.Record record : rows) {
-          if ((Integer) record.get("status") < 2 /* added or existing */) {
-            expected.add((GenericData.Record) record.get("data_file"));
-          }
-        }
-      }
-    }
-
-    Assert.assertEquals("Files table should have one row", 1, expected.size());
-    Assert.assertEquals("Actual results should have one row", 1, actual.size());
-    TestHelpers.assertEqualsSafe(filesTable.schema().asStruct(), expected.get(0), actual.get(0));
+  @Override
+  public String loadLocation(TableIdentifier ident, String entriesSuffix) {
+    return String.format("%s#%s", loadLocation(ident), entriesSuffix);
   }
 
-  @Test
-  public void testAllDataFilesTable() throws Exception {
-    Table table = TABLES.create(SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("id").build(), tableLocation);
-    Table entriesTable = TABLES.load(tableLocation + "#entries");
-    Table filesTable = TABLES.load(tableLocation + "#all_data_files");
-
-    Dataset<Row> df1 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class);
-    Dataset<Row> df2 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(2, "b")), SimpleRecord.class);
-
-    df1.select("id", "data").write()
-        .format("iceberg")
-        .mode("append")
-        .save(tableLocation);
-
-    // delete the first file to test that not only live files are listed
-    table.newDelete().deleteFromRowFilter(Expressions.equal("id", 1)).commit();
-
-    // add a second file
-    df2.select("id", "data").write()
-        .format("iceberg")
-        .mode("append")
-        .save(tableLocation);
-
-    // ensure table data isn't stale
-    table.refresh();
-
-    List<Row> actual = spark.read()
-        .format("iceberg")
-        .load(tableLocation + "#all_data_files")
-        .orderBy("file_path")
-        .collectAsList();
-    actual.sort(Comparator.comparing(o -> o.getString(0)));
-
-    List<GenericData.Record> expected = Lists.newArrayList();
-    for (ManifestFile manifest : Iterables.concat(Iterables.transform(table.snapshots(), Snapshot::manifests))) {
-      InputFile in = table.io().newInputFile(manifest.path());
-      try (CloseableIterable<GenericData.Record> rows = Avro.read(in).project(entriesTable.schema()).build()) {
-        for (GenericData.Record record : rows) {
-          if ((Integer) record.get("status") < 2 /* added or existing */) {
-            expected.add((GenericData.Record) record.get("data_file"));
-          }
-        }
-      }
-    }
-
-    expected.sort(Comparator.comparing(o -> o.get("file_path").toString()));
-
-    Assert.assertEquals("Files table should have two rows", 2, expected.size());
-    Assert.assertEquals("Actual results should have two rows", 2, actual.size());
-    for (int i = 0; i < expected.size(); i += 1) {
-      TestHelpers.assertEqualsSafe(filesTable.schema().asStruct(), expected.get(i), actual.get(i));
-    }
-  }
-
-  @Test
-  public void testHistoryTable() {
-    Table table = TABLES.create(SCHEMA, tableLocation);
-    Table historyTable = TABLES.load(tableLocation + "#history");
-
-    List<SimpleRecord> records = Lists.newArrayList(new SimpleRecord(1, "1"));
-    Dataset<Row> inputDf = spark.createDataFrame(records, SimpleRecord.class);
-
-    inputDf.select("id", "data").write()
-        .format("iceberg")
-        .mode("append")
-        .save(tableLocation);
-
-    table.refresh();
-    long firstSnapshotTimestamp = table.currentSnapshot().timestampMillis();
-    long firstSnapshotId = table.currentSnapshot().snapshotId();
-
-    inputDf.select("id", "data").write()
-        .format("iceberg")
-        .mode("append")
-        .save(tableLocation);
-
-    table.refresh();
-    long secondSnapshotTimestamp = table.currentSnapshot().timestampMillis();
-    long secondSnapshotId = table.currentSnapshot().snapshotId();
-
-    // rollback the table state to the first snapshot
-    table.rollback().toSnapshotId(firstSnapshotId).commit();
-    long rollbackTimestamp = Iterables.getLast(table.history()).timestampMillis();
-
-    inputDf.select("id", "data").write()
-        .format("iceberg")
-        .mode("append")
-        .save(tableLocation);
-
-    table.refresh();
-    long thirdSnapshotTimestamp = table.currentSnapshot().timestampMillis();
-    long thirdSnapshotId = table.currentSnapshot().snapshotId();
-
-    List<Row> actual = spark.read()
-        .format("iceberg")
-        .load(tableLocation + "#history")
-        .collectAsList();
-
-    GenericRecordBuilder builder = new GenericRecordBuilder(AvroSchemaUtil.convert(historyTable.schema(), "history"));
-    List<GenericData.Record> expected = Lists.newArrayList(
-        builder.set("made_current_at", firstSnapshotTimestamp * 1000)
-            .set("snapshot_id", firstSnapshotId)
-            .set("parent_id", null)
-            .set("is_current_ancestor", true)
-            .build(),
-        builder.set("made_current_at", secondSnapshotTimestamp * 1000)
-            .set("snapshot_id", secondSnapshotId)
-            .set("parent_id", firstSnapshotId)
-            .set("is_current_ancestor", false) // commit rolled back, not an ancestor of the current table state
-            .build(),
-        builder.set("made_current_at", rollbackTimestamp * 1000)
-            .set("snapshot_id", firstSnapshotId)
-            .set("parent_id", null)
-            .set("is_current_ancestor", true)
-            .build(),
-        builder.set("made_current_at", thirdSnapshotTimestamp * 1000)
-            .set("snapshot_id", thirdSnapshotId)
-            .set("parent_id", firstSnapshotId)
-            .set("is_current_ancestor", true)
-            .build()
-    );
-
-    Assert.assertEquals("History table should have a row for each commit", 4, actual.size());
-    TestHelpers.assertEqualsSafe(historyTable.schema().asStruct(), expected.get(0), actual.get(0));
-    TestHelpers.assertEqualsSafe(historyTable.schema().asStruct(), expected.get(1), actual.get(1));
-    TestHelpers.assertEqualsSafe(historyTable.schema().asStruct(), expected.get(2), actual.get(2));
-  }
-
-  @Test
-  public void testSnapshotsTable() {
-    Table table = TABLES.create(SCHEMA, tableLocation);
-    Table snapTable = TABLES.load(tableLocation + "#snapshots");
-
-    List<SimpleRecord> records = Lists.newArrayList(new SimpleRecord(1, "1"));
-    Dataset<Row> inputDf = spark.createDataFrame(records, SimpleRecord.class);
-
-    inputDf.select("id", "data").write()
-        .format("iceberg")
-        .mode("append")
-        .save(tableLocation);
-
-    table.refresh();
-    long firstSnapshotTimestamp = table.currentSnapshot().timestampMillis();
-    long firstSnapshotId = table.currentSnapshot().snapshotId();
-    String firstManifestList = table.currentSnapshot().manifestListLocation();
-
-    table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit();
-
-    long secondSnapshotTimestamp = table.currentSnapshot().timestampMillis();
-    long secondSnapshotId = table.currentSnapshot().snapshotId();
-    String secondManifestList = table.currentSnapshot().manifestListLocation();
-
-    // rollback the table state to the first snapshot
-    table.rollback().toSnapshotId(firstSnapshotId).commit();
-
-    List<Row> actual = spark.read()
-        .format("iceberg")
-        .load(tableLocation + "#snapshots")
-        .collectAsList();
-
-    GenericRecordBuilder builder = new GenericRecordBuilder(AvroSchemaUtil.convert(snapTable.schema(), "snapshots"));
-    List<GenericData.Record> expected = Lists.newArrayList(
-        builder.set("committed_at", firstSnapshotTimestamp * 1000)
-            .set("snapshot_id", firstSnapshotId)
-            .set("parent_id", null)
-            .set("operation", "append")
-            .set("manifest_list", firstManifestList)
-            .set("summary", ImmutableMap.of(
-                "added-records", "1",
-                "added-data-files", "1",
-                "changed-partition-count", "1",
-                "total-data-files", "1",
-                "total-records", "1"
-            ))
-            .build(),
-        builder.set("committed_at", secondSnapshotTimestamp * 1000)
-            .set("snapshot_id", secondSnapshotId)
-            .set("parent_id", firstSnapshotId)
-            .set("operation", "delete")
-            .set("manifest_list", secondManifestList)
-            .set("summary", ImmutableMap.of(
-                "deleted-records", "1",
-                "deleted-data-files", "1",
-                "changed-partition-count", "1",
-                "total-records", "0",
-                "total-data-files", "0"
-            ))
-            .build()
-    );
-
-    Assert.assertEquals("Snapshots table should have a row for each snapshot", 2, actual.size());
-    TestHelpers.assertEqualsSafe(snapTable.schema().asStruct(), expected.get(0), actual.get(0));
-    TestHelpers.assertEqualsSafe(snapTable.schema().asStruct(), expected.get(1), actual.get(1));
-  }
-
-  @Test
-  public void testManifestsTable() {
-    Table table = TABLES.create(SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("id").build(), tableLocation);
-    Table manifestTable = TABLES.load(tableLocation + "#manifests");
-
-    Dataset<Row> df1 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class);
-
-    df1.select("id", "data").write()
-        .format("iceberg")
-        .mode("append")
-        .save(tableLocation);
-
-    List<Row> actual = spark.read()
-        .format("iceberg")
-        .load(tableLocation + "#manifests")
-        .collectAsList();
-
-    table.refresh();
-
-    GenericRecordBuilder builder = new GenericRecordBuilder(AvroSchemaUtil.convert(
-        manifestTable.schema(), "manifests"));
-    GenericRecordBuilder summaryBuilder = new GenericRecordBuilder(AvroSchemaUtil.convert(
-        manifestTable.schema().findType("partition_summaries.element").asStructType(), "partition_summary"));
-    List<GenericData.Record> expected = Lists.transform(table.currentSnapshot().manifests(), manifest ->
-        builder.set("path", manifest.path())
-            .set("length", manifest.length())
-            .set("partition_spec_id", manifest.partitionSpecId())
-            .set("added_snapshot_id", manifest.snapshotId())
-            .set("added_data_files_count", manifest.addedFilesCount())
-            .set("existing_data_files_count", manifest.existingFilesCount())
-            .set("deleted_data_files_count", manifest.deletedFilesCount())
-            .set("partition_summaries", Lists.transform(manifest.partitions(), partition ->
-                summaryBuilder
-                    .set("contains_null", false)
-                    .set("lower_bound", "1")
-                    .set("upper_bound", "1")
-                    .build()
-                ))
-            .build()
-    );
-
-    Assert.assertEquals("Manifests table should have one manifest row", 1, actual.size());
-    TestHelpers.assertEqualsSafe(manifestTable.schema().asStruct(), expected.get(0), actual.get(0));
-  }
-
-  @Test
-  public void testAllManifestsTable() {
-    Table table = TABLES.create(SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("id").build(), tableLocation);
-    Table manifestTable = TABLES.load(tableLocation + "#all_manifests");
-
-    Dataset<Row> df1 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class);
-
-    List<ManifestFile> manifests = Lists.newArrayList();
-
-    df1.select("id", "data").write()
-        .format("iceberg")
-        .mode("append")
-        .save(tableLocation);
-
-    manifests.addAll(table.currentSnapshot().manifests());
-
-    table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit();
-
-    manifests.addAll(table.currentSnapshot().manifests());
-
-    List<Row> actual = spark.read()
-        .format("iceberg")
-        .load(tableLocation + "#all_manifests")
-        .orderBy("path")
-        .collectAsList();
-
-    table.refresh();
-
-    GenericRecordBuilder builder = new GenericRecordBuilder(AvroSchemaUtil.convert(
-        manifestTable.schema(), "manifests"));
-    GenericRecordBuilder summaryBuilder = new GenericRecordBuilder(AvroSchemaUtil.convert(
-        manifestTable.schema().findType("partition_summaries.element").asStructType(), "partition_summary"));
-    List<GenericData.Record> expected = Lists.newArrayList(Iterables.transform(manifests, manifest ->
-        builder.set("path", manifest.path())
-            .set("length", manifest.length())
-            .set("partition_spec_id", manifest.partitionSpecId())
-            .set("added_snapshot_id", manifest.snapshotId())
-            .set("added_data_files_count", manifest.addedFilesCount())
-            .set("existing_data_files_count", manifest.existingFilesCount())
-            .set("deleted_data_files_count", manifest.deletedFilesCount())
-            .set("partition_summaries", Lists.transform(manifest.partitions(), partition ->
-                summaryBuilder
-                    .set("contains_null", false)
-                    .set("lower_bound", "1")
-                    .set("upper_bound", "1")
-                    .build()
-            ))
-            .build()
-    ));
-
-    expected.sort(Comparator.comparing(o -> o.get("path").toString()));
-
-    Assert.assertEquals("Manifests table should have two manifest rows", 2, actual.size());
-    for (int i = 0; i < expected.size(); i += 1) {
-      TestHelpers.assertEqualsSafe(manifestTable.schema().asStruct(), expected.get(i), actual.get(i));
-    }
-  }
-
-  @Test
-  public void testPartitionsTable() {
-    Table table = TABLES.create(SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("id").build(), tableLocation);
-    Table partitionsTable = TABLES.load(tableLocation + "#partitions");
-
-    Dataset<Row> df1 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class);
-    Dataset<Row> df2 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(2, "b")), SimpleRecord.class);
-
-    df1.select("id", "data").write()
-        .format("iceberg")
-        .mode("append")
-        .save(tableLocation);
-
-    table.refresh();
-    long firstCommitId = table.currentSnapshot().snapshotId();
-
-    // add a second file
-    df2.select("id", "data").write()
-        .format("iceberg")
-        .mode("append")
-        .save(tableLocation);
-
-    List<Row> actual = spark.read()
-        .format("iceberg")
-        .load(tableLocation + "#partitions")
-        .orderBy("partition.id")
-        .collectAsList();
-
-    GenericRecordBuilder builder = new GenericRecordBuilder(AvroSchemaUtil.convert(
-        partitionsTable.schema(), "partitions"));
-    GenericRecordBuilder partitionBuilder = new GenericRecordBuilder(AvroSchemaUtil.convert(
-        partitionsTable.schema().findType("partition").asStructType(), "partition"));
-    List<GenericData.Record> expected = Lists.newArrayList();
-    expected.add(builder
-        .set("partition", partitionBuilder.set("id", 1).build())
-        .set("record_count", 1L)
-        .set("file_count", 1)
-        .build());
-    expected.add(builder
-        .set("partition", partitionBuilder.set("id", 2).build())
-        .set("record_count", 1L)
-        .set("file_count", 1)
-        .build());
-
-    Assert.assertEquals("Partitions table should have two rows", 2, expected.size());
-    Assert.assertEquals("Actual results should have two rows", 2, actual.size());
-    for (int i = 0; i < 2; i += 1) {
-      TestHelpers.assertEqualsSafe(partitionsTable.schema().asStruct(), expected.get(i), actual.get(i));
-    }
-
-    // check time travel
-    List<Row> actualAfterFirstCommit = spark.read()
-        .format("iceberg")
-        .option("snapshot-id", String.valueOf(firstCommitId))
-        .load(tableLocation + "#partitions")
-        .orderBy("partition.id")
-        .collectAsList();
-
-    Assert.assertEquals("Actual results should have one row", 1, actualAfterFirstCommit.size());
-    TestHelpers.assertEqualsSafe(partitionsTable.schema().asStruct(), expected.get(0), actualAfterFirstCommit.get(0));
+  @Override
+  public String loadLocation(TableIdentifier ident) {
+    return tableLocation;
   }
 }
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java
index 8d47a02..669c876 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java
@@ -19,60 +19,30 @@
 
 package org.apache.iceberg.spark.source;
 
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import java.util.Comparator;
 import java.util.HashMap;
-import java.util.List;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecordBuilder;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.iceberg.DataFile;
-import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
-import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.avro.Avro;
-import org.apache.iceberg.avro.AvroSchemaUtil;
 import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.hive.HiveCatalog;
 import org.apache.iceberg.hive.HiveClientPool;
 import org.apache.iceberg.hive.TestHiveMetastore;
-import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.io.InputFile;
-import org.apache.iceberg.spark.SparkTableUtil;
-import org.apache.iceberg.spark.data.TestHelpers;
-import org.apache.iceberg.types.Types;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Encoders;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SaveMode;
 import org.apache.spark.sql.SparkSession;
+import org.junit.After;
 import org.junit.AfterClass;
-import org.junit.Assert;
 import org.junit.BeforeClass;
-import org.junit.Test;
 
 import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS;
-import static org.apache.iceberg.types.Types.NestedField.optional;
 
-public class TestIcebergSourceHiveTables {
+public class TestIcebergSourceHiveTables extends TestIcebergSourceTablesBase {
 
-  private static final Schema SCHEMA = new Schema(
-      optional(1, "id", Types.IntegerType.get()),
-      optional(2, "data", Types.StringType.get())
-  );
-
-  private static SparkSession spark;
   private static TestHiveMetastore metastore;
   private static HiveClientPool clients;
   private static HiveConf hiveConf;
   private static HiveCatalog catalog;
+  private static TableIdentifier currentIdentifier;
 
   @BeforeClass
   public static void startMetastoreAndSpark() throws Exception {
@@ -107,727 +77,34 @@ public class TestIcebergSourceHiveTables {
     TestIcebergSourceHiveTables.spark = null;
   }
 
-  @Test
-  public synchronized void testHiveTablesSupport() throws Exception {
-    TableIdentifier tableIdentifier = TableIdentifier.of("db", "table");
-    try {
-      catalog.createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());
-
-      List<SimpleRecord> expectedRecords = Lists.newArrayList(
-          new SimpleRecord(1, "1"),
-          new SimpleRecord(2, "2"),
-          new SimpleRecord(3, "3"));
-
-      Dataset<Row> inputDf = spark.createDataFrame(expectedRecords, SimpleRecord.class);
-      inputDf.select("id", "data").write()
-          .format("iceberg")
-          .mode(SaveMode.Append)
-          .save(tableIdentifier.toString());
-
-      Dataset<Row> resultDf = spark.read()
-          .format("iceberg")
-          .load(tableIdentifier.toString());
-      List<SimpleRecord> actualRecords = resultDf.orderBy("id")
-          .as(Encoders.bean(SimpleRecord.class))
-          .collectAsList();
-
-      Assert.assertEquals("Records should match", expectedRecords, actualRecords);
-    } finally {
-      clients.run(client -> {
-        client.dropTable(tableIdentifier.namespace().level(0), tableIdentifier.name());
-        return null;
-      });
-    }
-  }
-
-  @Test
-  public synchronized void testHiveEntriesTable() throws Exception {
-    TableIdentifier tableIdentifier = TableIdentifier.of("db", "entries_test");
-    try {
-      Table table = catalog.createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());
-      Table entriesTable = catalog.loadTable(TableIdentifier.of("db", "entries_test", "entries"));
-
-      List<SimpleRecord> records = Lists.newArrayList(new SimpleRecord(1, "1"));
-
-      Dataset<Row> inputDf = spark.createDataFrame(records, SimpleRecord.class);
-      inputDf.select("id", "data").write()
-          .format("iceberg")
-          .mode("append")
-          .save(tableIdentifier.toString());
-
-      table.refresh();
-
-      List<Row> actual = spark.read()
-          .format("iceberg")
-          .load("db.entries_test.entries")
-          .collectAsList();
-
-      Assert.assertEquals("Should only contain one manifest", 1, table.currentSnapshot().manifests().size());
-      InputFile manifest = table.io().newInputFile(table.currentSnapshot().manifests().get(0).path());
-      List<GenericData.Record> expected;
-      try (CloseableIterable<GenericData.Record> rows = Avro.read(manifest).project(entriesTable.schema()).build()) {
-        expected = Lists.newArrayList(rows);
-      }
-
-      Assert.assertEquals("Entries table should have one row", 1, expected.size());
-      Assert.assertEquals("Actual results should have one row", 1, actual.size());
-      TestHelpers.assertEqualsSafe(entriesTable.schema().asStruct(), expected.get(0), actual.get(0));
-
-    } finally {
-      clients.run(client -> {
-        client.dropTable(tableIdentifier.namespace().level(0), tableIdentifier.name());
-        return null;
-      });
-    }
-  }
-
-  @Test
-  public synchronized void testHiveAllEntriesTable() throws Exception {
-    TableIdentifier tableIdentifier = TableIdentifier.of("db", "entries_test");
-    try {
-      Table table = catalog.createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());
-      Table entriesTable = catalog.loadTable(TableIdentifier.of("db", "entries_test", "all_entries"));
-
-      Dataset<Row> df1 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class);
-      Dataset<Row> df2 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(2, "b")), SimpleRecord.class);
-
-      df1.select("id", "data").write()
-          .format("iceberg")
-          .mode("append")
-          .save(tableIdentifier.toString());
-
-      // delete the first file to test that not only live files are listed
-      table.newDelete().deleteFromRowFilter(Expressions.equal("id", 1)).commit();
-
-      // add a second file
-      df2.select("id", "data").write()
-          .format("iceberg")
-          .mode("append")
-          .save(tableIdentifier.toString());
-
-      // ensure table data isn't stale
-      table.refresh();
-
-      List<Row> actual = spark.read()
-          .format("iceberg")
-          .load("db.entries_test.all_entries")
-          .orderBy("snapshot_id")
-          .collectAsList();
-
-      List<GenericData.Record> expected = Lists.newArrayList();
-      for (ManifestFile manifest : Iterables.concat(Iterables.transform(table.snapshots(), Snapshot::manifests))) {
-        InputFile in = table.io().newInputFile(manifest.path());
-        try (CloseableIterable<GenericData.Record> rows = Avro.read(in).project(entriesTable.schema()).build()) {
-          for (GenericData.Record record : rows) {
-            expected.add(record);
-          }
-        }
-      }
-
-      expected.sort(Comparator.comparing(o -> (Long) o.get("snapshot_id")));
-
-      Assert.assertEquals("Entries table should have 3 rows", 3, expected.size());
-      Assert.assertEquals("Actual results should have 3 rows", 3, actual.size());
-      for (int i = 0; i < expected.size(); i += 1) {
-        TestHelpers.assertEqualsSafe(entriesTable.schema().asStruct(), expected.get(i), actual.get(i));
-      }
-
-    } finally {
-      clients.run(client -> {
-        client.dropTable(tableIdentifier.namespace().level(0), tableIdentifier.name());
-        return null;
-      });
-    }
-  }
-
-  @Test
-  public synchronized void testHiveFilesTable() throws Exception {
-    TableIdentifier tableIdentifier = TableIdentifier.of("db", "files_test");
-    try {
-      Table table = catalog.createTable(tableIdentifier, SCHEMA,
-          PartitionSpec.builderFor(SCHEMA).identity("id").build());
-      Table entriesTable = catalog.loadTable(TableIdentifier.of("db", "files_test", "entries"));
-      Table filesTable = catalog.loadTable(TableIdentifier.of("db", "files_test", "files"));
-
-      Dataset<Row> df1 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class);
-      Dataset<Row> df2 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(2, "b")), SimpleRecord.class);
-
-      df1.select("id", "data").write()
-          .format("iceberg")
-          .mode("append")
-          .save(tableIdentifier.toString());
-
-      // add a second file
-      df2.select("id", "data").write()
-          .format("iceberg")
-          .mode("append")
-          .save(tableIdentifier.toString());
-
-      // delete the first file to test that only live files are listed
-      table.newDelete().deleteFromRowFilter(Expressions.equal("id", 1)).commit();
-
-      List<Row> actual = spark.read()
-          .format("iceberg")
-          .load("db.files_test.files")
-          .collectAsList();
-
-      List<GenericData.Record> expected = Lists.newArrayList();
-      for (ManifestFile manifest : table.currentSnapshot().manifests()) {
-        InputFile in = table.io().newInputFile(manifest.path());
-        try (CloseableIterable<GenericData.Record> rows = Avro.read(in).project(entriesTable.schema()).build()) {
-          for (GenericData.Record record : rows) {
-            if ((Integer) record.get("status") < 2 /* added or existing */) {
-              expected.add((GenericData.Record) record.get("data_file"));
-            }
-          }
-        }
-      }
-
-      Assert.assertEquals("Files table should have one row", 1, expected.size());
-      Assert.assertEquals("Actual results should have one row", 1, actual.size());
-      TestHelpers.assertEqualsSafe(filesTable.schema().asStruct(), expected.get(0), actual.get(0));
-
-    } finally {
-      clients.run(client -> {
-        client.dropTable(tableIdentifier.namespace().level(0), tableIdentifier.name());
-        return null;
-      });
-    }
-  }
-
-  @Test
-  public synchronized void testHiveFilesTableWithSnapshotIdInheritance() throws Exception {
-    TableIdentifier tableIdentifier = TableIdentifier.of("db", "files_inheritance_test");
-    try {
-      PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("id").build();
-      Table table = catalog.createTable(tableIdentifier, SCHEMA, spec);
-
-      table.updateProperties()
-          .set(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, "true")
-          .commit();
-
-      Table entriesTable = catalog.loadTable(TableIdentifier.of("db", "files_inheritance_test", "entries"));
-      Table filesTable = catalog.loadTable(TableIdentifier.of("db", "files_inheritance_test", "files"));
-
-      List<SimpleRecord> records = Lists.newArrayList(
-          new SimpleRecord(1, "a"),
-          new SimpleRecord(2, "b")
-      );
-
-      Dataset<Row> inputDF = spark.createDataFrame(records, SimpleRecord.class);
-      inputDF.select("id", "data").write()
-          .format("parquet")
-          .mode("append")
-          .partitionBy("id")
-          .saveAsTable("parquet_table");
-
-      String stagingLocation = table.location() + "/metadata";
-      SparkTableUtil.importSparkTable(
-          spark, new org.apache.spark.sql.catalyst.TableIdentifier("parquet_table"), table, stagingLocation);
-
-      List<Row> actual = spark.read()
-          .format("iceberg")
-          .load(tableIdentifier + ".files")
-          .collectAsList();
-
-      List<GenericData.Record> expected = Lists.newArrayList();
-      for (ManifestFile manifest : table.currentSnapshot().manifests()) {
-        InputFile in = table.io().newInputFile(manifest.path());
-        try (CloseableIterable<GenericData.Record> rows = Avro.read(in).project(entriesTable.schema()).build()) {
-          for (GenericData.Record record : rows) {
-            expected.add((GenericData.Record) record.get("data_file"));
-          }
-        }
-      }
-
-      Assert.assertEquals("Files table should have one row", 2, expected.size());
-      Assert.assertEquals("Actual results should have one row", 2, actual.size());
-      TestHelpers.assertEqualsSafe(filesTable.schema().asStruct(), expected.get(0), actual.get(0));
-      TestHelpers.assertEqualsSafe(filesTable.schema().asStruct(), expected.get(1), actual.get(1));
-
-    } finally {
-      spark.sql("DROP TABLE parquet_table");
-      clients.run(client -> {
-        client.dropTable(tableIdentifier.namespace().level(0), tableIdentifier.name());
-        return null;
-      });
-    }
-  }
-
-  @Test
-  public synchronized void testHiveFilesUnpartitionedTable() throws Exception {
-    TableIdentifier tableIdentifier = TableIdentifier.of("db", "unpartitioned_files_test");
-    try {
-      Table table = catalog.createTable(tableIdentifier, SCHEMA);
-      Table entriesTable = catalog.loadTable(TableIdentifier.of("db", "unpartitioned_files_test", "entries"));
-      Table filesTable = catalog.loadTable(TableIdentifier.of("db", "unpartitioned_files_test", "files"));
-
-      Dataset<Row> df1 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class);
-      Dataset<Row> df2 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(2, "b")), SimpleRecord.class);
-
-      df1.select("id", "data").write()
-          .format("iceberg")
-          .mode("append")
-          .save(tableIdentifier.toString());
-
-      table.refresh();
-      DataFile toDelete = Iterables.getOnlyElement(table.currentSnapshot().addedFiles());
-
-      // add a second file
-      df2.select("id", "data").write()
-          .format("iceberg")
-          .mode("append")
-          .save(tableIdentifier.toString());
-
-      // delete the first file to test that only live files are listed
-      table.newDelete().deleteFile(toDelete).commit();
-
-      List<Row> actual = spark.read()
-          .format("iceberg")
-          .load("db.unpartitioned_files_test.files")
-          .collectAsList();
-
-      List<GenericData.Record> expected = Lists.newArrayList();
-      for (ManifestFile manifest : table.currentSnapshot().manifests()) {
-        InputFile in = table.io().newInputFile(manifest.path());
-        try (CloseableIterable<GenericData.Record> rows = Avro.read(in).project(entriesTable.schema()).build()) {
-          for (GenericData.Record record : rows) {
-            if ((Integer) record.get("status") < 2 /* added or existing */) {
-              expected.add((GenericData.Record) record.get("data_file"));
-            }
-          }
-        }
-      }
-
-      Assert.assertEquals("Files table should have one row", 1, expected.size());
-      Assert.assertEquals("Actual results should have one row", 1, actual.size());
-      TestHelpers.assertEqualsSafe(filesTable.schema().asStruct(), expected.get(0), actual.get(0));
-
-    } finally {
-      clients.run(client -> {
-        client.dropTable(tableIdentifier.namespace().level(0), tableIdentifier.name());
-        return null;
-      });
-    }
-  }
-
-  @Test
-  public synchronized void testHiveAllDataFilesTable() throws Exception {
-    TableIdentifier tableIdentifier = TableIdentifier.of("db", "files_test");
-    try {
-      Table table = catalog.createTable(tableIdentifier, SCHEMA,
-          PartitionSpec.builderFor(SCHEMA).identity("id").build());
-      Table entriesTable = catalog.loadTable(TableIdentifier.of("db", "files_test", "entries"));
-      Table filesTable = catalog.loadTable(TableIdentifier.of("db", "files_test", "all_data_files"));
-
-      Dataset<Row> df1 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class);
-      Dataset<Row> df2 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(2, "b")), SimpleRecord.class);
-
-      df1.select("id", "data").write()
-          .format("iceberg")
-          .mode("append")
-          .save(tableIdentifier.toString());
-
-      // delete the first file to test that not only live files are listed
-      table.newDelete().deleteFromRowFilter(Expressions.equal("id", 1)).commit();
-
-      // add a second file
-      df2.select("id", "data").write()
-          .format("iceberg")
-          .mode("append")
-          .save(tableIdentifier.toString());
-
-      // ensure table data isn't stale
-      table.refresh();
-
-      List<Row> actual = spark.read()
-          .format("iceberg")
-          .load("db.files_test.all_data_files")
-          .orderBy("file_path")
-          .collectAsList();
-
-      List<GenericData.Record> expected = Lists.newArrayList();
-      for (ManifestFile manifest : Iterables.concat(Iterables.transform(table.snapshots(), Snapshot::manifests))) {
-        InputFile in = table.io().newInputFile(manifest.path());
-        try (CloseableIterable<GenericData.Record> rows = Avro.read(in).project(entriesTable.schema()).build()) {
-          for (GenericData.Record record : rows) {
-            if ((Integer) record.get("status") < 2 /* added or existing */) {
-              expected.add((GenericData.Record) record.get("data_file"));
-            }
-          }
-        }
-      }
-
-      expected.sort(Comparator.comparing(o -> o.get("file_path").toString()));
-
-      Assert.assertEquals("Files table should have two rows", 2, expected.size());
-      Assert.assertEquals("Actual results should have two rows", 2, actual.size());
-      for (int i = 0; i < expected.size(); i += 1) {
-        TestHelpers.assertEqualsSafe(filesTable.schema().asStruct(), expected.get(i), actual.get(i));
-      }
-
-    } finally {
-      clients.run(client -> {
-        client.dropTable(tableIdentifier.namespace().level(0), tableIdentifier.name());
-        return null;
-      });
-    }
-  }
-
-  @Test
-  public synchronized void testHiveHistoryTable() throws Exception {
-    TableIdentifier tableIdentifier = TableIdentifier.of("db", "history_test");
-    try {
-      Table table = catalog.createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());
-      Table historyTable = catalog.loadTable(TableIdentifier.of("db", "history_test", "history"));
-
-      List<SimpleRecord> records = Lists.newArrayList(new SimpleRecord(1, "1"));
-      Dataset<Row> inputDf = spark.createDataFrame(records, SimpleRecord.class);
-
-      inputDf.select("id", "data").write()
-          .format("iceberg")
-          .mode("append")
-          .save(tableIdentifier.toString());
-
-      table.refresh();
-      long firstSnapshotTimestamp = table.currentSnapshot().timestampMillis();
-      long firstSnapshotId = table.currentSnapshot().snapshotId();
-
-      inputDf.select("id", "data").write()
-          .format("iceberg")
-          .mode("append")
-          .save(tableIdentifier.toString());
-
-      table.refresh();
-      long secondSnapshotTimestamp = table.currentSnapshot().timestampMillis();
-      long secondSnapshotId = table.currentSnapshot().snapshotId();
-
-      // rollback the table state to the first snapshot
-      table.rollback().toSnapshotId(firstSnapshotId).commit();
-      long rollbackTimestamp = Iterables.getLast(table.history()).timestampMillis();
-
-      inputDf.select("id", "data").write()
-          .format("iceberg")
-          .mode("append")
-          .save(tableIdentifier.toString());
-
-      table.refresh();
-      long thirdSnapshotTimestamp = table.currentSnapshot().timestampMillis();
-      long thirdSnapshotId = table.currentSnapshot().snapshotId();
-
-      List<Row> actual = spark.read()
-          .format("iceberg")
-          .load("db.history_test.history")
-          .collectAsList();
-
-      GenericRecordBuilder builder = new GenericRecordBuilder(AvroSchemaUtil.convert(historyTable.schema(), "history"));
-      List<GenericData.Record> expected = Lists.newArrayList(
-          builder.set("made_current_at", firstSnapshotTimestamp * 1000)
-              .set("snapshot_id", firstSnapshotId)
-              .set("parent_id", null)
-              .set("is_current_ancestor", true)
-              .build(),
-          builder.set("made_current_at", secondSnapshotTimestamp * 1000)
-              .set("snapshot_id", secondSnapshotId)
-              .set("parent_id", firstSnapshotId)
-              .set("is_current_ancestor", false) // commit rolled back, not an ancestor of the current table state
-              .build(),
-          builder.set("made_current_at", rollbackTimestamp * 1000)
-              .set("snapshot_id", firstSnapshotId)
-              .set("parent_id", null)
-              .set("is_current_ancestor", true)
-              .build(),
-          builder.set("made_current_at", thirdSnapshotTimestamp * 1000)
-              .set("snapshot_id", thirdSnapshotId)
-              .set("parent_id", firstSnapshotId)
-              .set("is_current_ancestor", true)
-              .build()
-      );
-
-      Assert.assertEquals("History table should have a row for each commit", 4, actual.size());
-      TestHelpers.assertEqualsSafe(historyTable.schema().asStruct(), expected.get(0), actual.get(0));
-      TestHelpers.assertEqualsSafe(historyTable.schema().asStruct(), expected.get(1), actual.get(1));
-      TestHelpers.assertEqualsSafe(historyTable.schema().asStruct(), expected.get(2), actual.get(2));
-
-    } finally {
-      clients.run(client -> {
-        client.dropTable(tableIdentifier.namespace().level(0), tableIdentifier.name());
-        return null;
-      });
-    }
+  @After
+  public void dropTable() throws Exception {
+    clients.run(client -> {
+      client.dropTable(TestIcebergSourceHiveTables.currentIdentifier.namespace().level(0),
+          TestIcebergSourceHiveTables.currentIdentifier.name());
+      return null;
+    });
   }
 
-  @Test
-  public synchronized void testHiveSnapshotsTable() throws Exception {
-    TableIdentifier tableIdentifier = TableIdentifier.of("db", "snapshots_test");
-    try {
-      Table table = catalog.createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());
-      Table snapTable = catalog.loadTable(TableIdentifier.of("db", "snapshots_test", "snapshots"));
-
-      List<SimpleRecord> records = Lists.newArrayList(new SimpleRecord(1, "1"));
-      Dataset<Row> inputDf = spark.createDataFrame(records, SimpleRecord.class);
-
-      inputDf.select("id", "data").write()
-          .format("iceberg")
-          .mode("append")
-          .save(tableIdentifier.toString());
-
-      table.refresh();
-      long firstSnapshotTimestamp = table.currentSnapshot().timestampMillis();
-      long firstSnapshotId = table.currentSnapshot().snapshotId();
-      String firstManifestList = table.currentSnapshot().manifestListLocation();
-
-      table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit();
-
-      long secondSnapshotTimestamp = table.currentSnapshot().timestampMillis();
-      long secondSnapshotId = table.currentSnapshot().snapshotId();
-      String secondManifestList = table.currentSnapshot().manifestListLocation();
-
-      // rollback the table state to the first snapshot
-      table.rollback().toSnapshotId(firstSnapshotId).commit();
-
-      List<Row> actual = spark.read()
-          .format("iceberg")
-          .load("db.snapshots_test.snapshots")
-          .collectAsList();
-
-      GenericRecordBuilder builder = new GenericRecordBuilder(AvroSchemaUtil.convert(snapTable.schema(), "snapshots"));
-      List<GenericData.Record> expected = Lists.newArrayList(
-          builder.set("committed_at", firstSnapshotTimestamp * 1000)
-              .set("snapshot_id", firstSnapshotId)
-              .set("parent_id", null)
-              .set("operation", "append")
-              .set("manifest_list", firstManifestList)
-              .set("summary", ImmutableMap.of(
-                  "added-records", "1",
-                  "added-data-files", "1",
-                  "changed-partition-count", "1",
-                  "total-data-files", "1",
-                  "total-records", "1"
-              ))
-              .build(),
-          builder.set("committed_at", secondSnapshotTimestamp * 1000)
-              .set("snapshot_id", secondSnapshotId)
-              .set("parent_id", firstSnapshotId)
-              .set("operation", "delete")
-              .set("manifest_list", secondManifestList)
-              .set("summary", ImmutableMap.of(
-                  "deleted-records", "1",
-                  "deleted-data-files", "1",
-                  "changed-partition-count", "1",
-                  "total-records", "0",
-                  "total-data-files", "0"
-              ))
-              .build()
-      );
-
-      Assert.assertEquals("Snapshots table should have a row for each snapshot", 2, actual.size());
-      TestHelpers.assertEqualsSafe(snapTable.schema().asStruct(), expected.get(0), actual.get(0));
-      TestHelpers.assertEqualsSafe(snapTable.schema().asStruct(), expected.get(1), actual.get(1));
-
-    } finally {
-      clients.run(client -> {
-        client.dropTable(tableIdentifier.namespace().level(0), tableIdentifier.name());
-        return null;
-      });
-    }
+  @Override
+  public Table createTable(TableIdentifier ident, Schema schema, PartitionSpec spec) {
+    TestIcebergSourceHiveTables.currentIdentifier = ident;
+    return TestIcebergSourceHiveTables.catalog.createTable(ident, schema, spec);
   }
 
-  @Test
-  public synchronized void testHiveManifestsTable() throws Exception {
-    TableIdentifier tableIdentifier = TableIdentifier.of("db", "manifests_test");
-    try {
-      Table table = catalog.createTable(
-          tableIdentifier,
-          SCHEMA,
-          PartitionSpec.builderFor(SCHEMA).identity("id").build());
-      Table manifestTable = catalog.loadTable(TableIdentifier.of("db", "manifests_test", "manifests"));
-
-      Dataset<Row> df1 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class);
-
-      df1.select("id", "data").write()
-          .format("iceberg")
-          .mode("append")
-          .save(tableIdentifier.toString());
-
-      List<Row> actual = spark.read()
-          .format("iceberg")
-          .load("db.manifests_test.manifests")
-          .collectAsList();
-
-      table.refresh();
-
-      GenericRecordBuilder builder = new GenericRecordBuilder(AvroSchemaUtil.convert(
-          manifestTable.schema(), "manifests"));
-      GenericRecordBuilder summaryBuilder = new GenericRecordBuilder(AvroSchemaUtil.convert(
-          manifestTable.schema().findType("partition_summaries.element").asStructType(), "partition_summary"));
-      List<GenericData.Record> expected = Lists.transform(table.currentSnapshot().manifests(), manifest ->
-          builder.set("path", manifest.path())
-              .set("length", manifest.length())
-              .set("partition_spec_id", manifest.partitionSpecId())
-              .set("added_snapshot_id", manifest.snapshotId())
-              .set("added_data_files_count", manifest.addedFilesCount())
-              .set("existing_data_files_count", manifest.existingFilesCount())
-              .set("deleted_data_files_count", manifest.deletedFilesCount())
-              .set("partition_summaries", Lists.transform(manifest.partitions(), partition ->
-                  summaryBuilder
-                      .set("contains_null", false)
-                      .set("lower_bound", "1")
-                      .set("upper_bound", "1")
-                      .build()
-                  ))
-              .build()
-      );
-
-      Assert.assertEquals("Manifests table should have one manifest row", 1, actual.size());
-      TestHelpers.assertEqualsSafe(manifestTable.schema().asStruct(), expected.get(0), actual.get(0));
-
-    } finally {
-      clients.run(client -> {
-        client.dropTable(tableIdentifier.namespace().level(0), tableIdentifier.name());
-        return null;
-      });
-    }
+  @Override
+  public Table loadTable(TableIdentifier ident, String entriesSuffix) {
+    TableIdentifier identifier = TableIdentifier.of(ident.namespace().level(0), ident.name(), entriesSuffix);
+    return TestIcebergSourceHiveTables.catalog.loadTable(identifier);
   }
 
-  @Test
-  public synchronized void testHiveAllManifestsTable() throws Exception {
-    TableIdentifier tableIdentifier = TableIdentifier.of("db", "manifests_test");
-    try {
-      Table table = catalog.createTable(
-          tableIdentifier,
-          SCHEMA,
-          PartitionSpec.builderFor(SCHEMA).identity("id").build());
-      Table manifestTable = catalog.loadTable(TableIdentifier.of("db", "manifests_test", "all_manifests"));
-
-      Dataset<Row> df1 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class);
-
-      List<ManifestFile> manifests = Lists.newArrayList();
-
-      df1.select("id", "data").write()
-          .format("iceberg")
-          .mode("append")
-          .save(tableIdentifier.toString());
-
-      manifests.addAll(table.currentSnapshot().manifests());
-
-      table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit();
-
-      manifests.addAll(table.currentSnapshot().manifests());
-
-      List<Row> actual = spark.read()
-          .format("iceberg")
-          .load("db.manifests_test.all_manifests")
-          .orderBy("path")
-          .collectAsList();
-
-      table.refresh();
-
-      GenericRecordBuilder builder = new GenericRecordBuilder(AvroSchemaUtil.convert(
-          manifestTable.schema(), "manifests"));
-      GenericRecordBuilder summaryBuilder = new GenericRecordBuilder(AvroSchemaUtil.convert(
-          manifestTable.schema().findType("partition_summaries.element").asStructType(), "partition_summary"));
-      List<GenericData.Record> expected = Lists.newArrayList(Iterables.transform(manifests, manifest ->
-          builder.set("path", manifest.path())
-              .set("length", manifest.length())
-              .set("partition_spec_id", manifest.partitionSpecId())
-              .set("added_snapshot_id", manifest.snapshotId())
-              .set("added_data_files_count", manifest.addedFilesCount())
-              .set("existing_data_files_count", manifest.existingFilesCount())
-              .set("deleted_data_files_count", manifest.deletedFilesCount())
-              .set("partition_summaries", Lists.transform(manifest.partitions(), partition ->
-                  summaryBuilder
-                      .set("contains_null", false)
-                      .set("lower_bound", "1")
-                      .set("upper_bound", "1")
-                      .build()
-              ))
-              .build()
-      ));
-
-      expected.sort(Comparator.comparing(o -> o.get("path").toString()));
-
-      Assert.assertEquals("Manifests table should have two manifest rows", 2, actual.size());
-      for (int i = 0; i < expected.size(); i += 1) {
-        TestHelpers.assertEqualsSafe(manifestTable.schema().asStruct(), expected.get(i), actual.get(i));
-      }
-
-    } finally {
-      clients.run(client -> {
-        client.dropTable(tableIdentifier.namespace().level(0), tableIdentifier.name());
-        return null;
-      });
-    }
+  @Override
+  public String loadLocation(TableIdentifier ident, String entriesSuffix) {
+    return String.format("%s.%s", loadLocation(ident), entriesSuffix);
   }
 
-  @Test
-  public synchronized void testHivePartitionsTable() throws Exception {
-    TableIdentifier ident = TableIdentifier.of("db", "partitions_test");
-    try {
-      Table table = catalog.createTable(ident, SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("id").build());
-      Table partitionsTable = catalog.loadTable(TableIdentifier.of("db", "partitions_test", "partitions"));
-
-      Dataset<Row> df1 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class);
-      Dataset<Row> df2 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(2, "b")), SimpleRecord.class);
-
-      df1.select("id", "data").write()
-          .format("iceberg")
-          .mode("append")
-          .save(ident.toString());
-
-      table.refresh();
-      long firstCommitId = table.currentSnapshot().snapshotId();
-
-      // add a second file
-      df2.select("id", "data").write()
-          .format("iceberg")
-          .mode("append")
-          .save(ident.toString());
-
-      List<Row> actual = spark.read()
-          .format("iceberg")
-          .load("db.partitions_test.partitions")
-          .orderBy("partition.id")
-          .collectAsList();
-
-      GenericRecordBuilder builder = new GenericRecordBuilder(AvroSchemaUtil.convert(
-          partitionsTable.schema(), "partitions"));
-      GenericRecordBuilder partitionBuilder = new GenericRecordBuilder(AvroSchemaUtil.convert(
-          partitionsTable.schema().findType("partition").asStructType(), "partition"));
-      List<GenericData.Record> expected = Lists.newArrayList();
-      expected.add(builder
-          .set("partition", partitionBuilder.set("id", 1).build())
-          .set("record_count", 1L)
-          .set("file_count", 1)
-          .build());
-      expected.add(builder
-          .set("partition", partitionBuilder.set("id", 2).build())
-          .set("record_count", 1L)
-          .set("file_count", 1)
-          .build());
-
-      Assert.assertEquals("Partitions table should have two rows", 2, expected.size());
-      Assert.assertEquals("Actual results should have two rows", 2, actual.size());
-      for (int i = 0; i < 2; i += 1) {
-        TestHelpers.assertEqualsSafe(partitionsTable.schema().asStruct(), expected.get(i), actual.get(i));
-      }
-
-      // check time travel
-      List<Row> actualAfterFirstCommit = spark.read()
-          .format("iceberg")
-          .option("snapshot-id", String.valueOf(firstCommitId))
-          .load("db.partitions_test.partitions")
-          .orderBy("partition.id")
-          .collectAsList();
-
-      Assert.assertEquals("Actual results should have one row", 1, actualAfterFirstCommit.size());
-      TestHelpers.assertEqualsSafe(partitionsTable.schema().asStruct(), expected.get(0), actualAfterFirstCommit.get(0));
-
-    } finally {
-      clients.run(client -> {
-        client.dropTable(ident.namespace().level(0), ident.name());
-        return null;
-      });
-    }
+  @Override
+  public String loadLocation(TableIdentifier ident) {
+    return ident.toString();
   }
 }
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHadoopTables.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
similarity index 79%
copy from spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHadoopTables.java
copy to spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index 9be496e..fc27ea8 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHadoopTables.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -22,12 +22,10 @@ package org.apache.iceberg.spark.source;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
-import java.io.File;
 import java.util.Comparator;
 import java.util.List;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecordBuilder;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.PartitionSpec;
@@ -37,69 +35,70 @@ import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.avro.Avro;
 import org.apache.iceberg.avro.AvroSchemaUtil;
+import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.expressions.Expressions;
-import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.spark.SparkTableUtil;
 import org.apache.iceberg.spark.data.TestHelpers;
 import org.apache.iceberg.types.Types;
 import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
 import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.catalyst.TableIdentifier;
-import org.junit.AfterClass;
 import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 
 import static org.apache.iceberg.types.Types.NestedField.optional;
 
-public class TestIcebergSourceHadoopTables {
+public abstract class TestIcebergSourceTablesBase {
 
+  protected static SparkSession spark;
   private static final Schema SCHEMA = new Schema(
       optional(1, "id", Types.IntegerType.get()),
       optional(2, "data", Types.StringType.get())
   );
 
-  private static SparkSession spark;
-  private static final HadoopTables TABLES = new HadoopTables(new Configuration());
+  public abstract Table createTable(TableIdentifier ident, Schema schema, PartitionSpec spec);
 
-  @BeforeClass
-  public static void startSpark() {
-    TestIcebergSourceHadoopTables.spark = SparkSession.builder()
-        .master("local[2]")
-        .getOrCreate();
-  }
+  public abstract Table loadTable(TableIdentifier ident, String entriesSuffix);
 
-  @AfterClass
-  public static void stopSpark() {
-    TestIcebergSourceHadoopTables.spark.stop();
-    TestIcebergSourceHadoopTables.spark = null;
-  }
+  public abstract String loadLocation(TableIdentifier ident, String entriesSuffix);
 
-  @Rule
-  public TemporaryFolder temp = new TemporaryFolder();
+  public abstract String loadLocation(TableIdentifier ident);
 
-  File tableDir = null;
-  String tableLocation = null;
+  @Test
+  public synchronized void testTablesSupport() {
+    TableIdentifier tableIdentifier = TableIdentifier.of("db", "table");
+    createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());
 
-  @Before
-  public void setupTable() throws Exception {
-    this.tableDir = temp.newFolder();
-    tableDir.delete(); // created by table create
+    List<SimpleRecord> expectedRecords = Lists.newArrayList(
+        new SimpleRecord(1, "1"),
+        new SimpleRecord(2, "2"),
+        new SimpleRecord(3, "3"));
+
+    Dataset<Row> inputDf = spark.createDataFrame(expectedRecords, SimpleRecord.class);
+    inputDf.select("id", "data").write()
+        .format("iceberg")
+        .mode(SaveMode.Append)
+        .save(loadLocation(tableIdentifier));
+
+    Dataset<Row> resultDf = spark.read()
+        .format("iceberg")
+        .load(loadLocation(tableIdentifier));
+    List<SimpleRecord> actualRecords = resultDf.orderBy("id")
+        .as(Encoders.bean(SimpleRecord.class))
+        .collectAsList();
 
-    this.tableLocation = tableDir.toURI().toString();
+    Assert.assertEquals("Records should match", expectedRecords, actualRecords);
   }
 
   @Test
   public void testEntriesTable() throws Exception {
-    Table table = TABLES.create(SCHEMA, tableLocation);
-    System.out.println(tableLocation);
-    Table entriesTable = TABLES.load(tableLocation + "#entries");
+    TableIdentifier tableIdentifier = TableIdentifier.of("db", "entries_test");
+    Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());
+    Table entriesTable = loadTable(tableIdentifier, "entries");
 
     List<SimpleRecord> records = Lists.newArrayList(new SimpleRecord(1, "1"));
 
@@ -107,13 +106,13 @@ public class TestIcebergSourceHadoopTables {
     inputDf.select("id", "data").write()
         .format("iceberg")
         .mode("append")
-        .save(tableLocation);
+        .save(loadLocation(tableIdentifier));
 
     table.refresh();
 
     List<Row> actual = spark.read()
         .format("iceberg")
-        .load(tableLocation + "#entries")
+        .load(loadLocation(tableIdentifier, "entries"))
         .collectAsList();
 
     Assert.assertEquals("Should only contain one manifest", 1, table.currentSnapshot().manifests().size());
@@ -130,17 +129,17 @@ public class TestIcebergSourceHadoopTables {
 
   @Test
   public void testAllEntriesTable() throws Exception {
-    Table table = TABLES.create(SCHEMA, tableLocation);
-    System.out.println(tableLocation);
-    Table entriesTable = TABLES.load(tableLocation + "#all_entries");
+    TableIdentifier tableIdentifier = TableIdentifier.of("db", "entries_test");
+    Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());
+    Table entriesTable = loadTable(tableIdentifier, "all_entries");
 
     Dataset<Row> df1 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class);
-    Dataset<Row> df2 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(2, "b")), SimpleRecord.class);
+    Dataset<Row> df2 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "b")), SimpleRecord.class);
 
     df1.select("id", "data").write()
         .format("iceberg")
         .mode("append")
-        .save(tableLocation);
+        .save(loadLocation(tableIdentifier));
 
     // delete the first file to test that not only live files are listed
     table.newDelete().deleteFromRowFilter(Expressions.equal("id", 1)).commit();
@@ -149,14 +148,14 @@ public class TestIcebergSourceHadoopTables {
     df2.select("id", "data").write()
         .format("iceberg")
         .mode("append")
-        .save(tableLocation);
+        .save(loadLocation(tableIdentifier));
 
     // ensure table data isn't stale
     table.refresh();
 
     List<Row> actual = spark.read()
         .format("iceberg")
-        .load(tableLocation + "#all_entries")
+        .load(loadLocation(tableIdentifier, "all_entries"))
         .orderBy("snapshot_id")
         .collectAsList();
 
@@ -181,9 +180,10 @@ public class TestIcebergSourceHadoopTables {
 
   @Test
   public void testFilesTable() throws Exception {
-    Table table = TABLES.create(SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("id").build(), tableLocation);
-    Table entriesTable = TABLES.load(tableLocation + "#entries");
-    Table filesTable = TABLES.load(tableLocation + "#files");
+    TableIdentifier tableIdentifier = TableIdentifier.of("db", "files_test");
+    Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("id").build());
+    Table entriesTable = loadTable(tableIdentifier, "entries");
+    Table filesTable = loadTable(tableIdentifier, "files");
 
     Dataset<Row> df1 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class);
     Dataset<Row> df2 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(2, "b")), SimpleRecord.class);
@@ -191,20 +191,20 @@ public class TestIcebergSourceHadoopTables {
     df1.select("id", "data").write()
         .format("iceberg")
         .mode("append")
-        .save(tableLocation);
+        .save(loadLocation(tableIdentifier));
 
     // add a second file
     df2.select("id", "data").write()
         .format("iceberg")
         .mode("append")
-        .save(tableLocation);
+        .save(loadLocation(tableIdentifier));
 
     // delete the first file to test that only live files are listed
     table.newDelete().deleteFromRowFilter(Expressions.equal("id", 1)).commit();
 
     List<Row> actual = spark.read()
         .format("iceberg")
-        .load(tableLocation + "#files")
+        .load(loadLocation(tableIdentifier, "files"))
         .collectAsList();
 
     List<GenericData.Record> expected = Lists.newArrayList();
@@ -226,14 +226,13 @@ public class TestIcebergSourceHadoopTables {
 
   @Test
   public void testFilesTableWithSnapshotIdInheritance() throws Exception {
-    Table table = TABLES.create(SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("id").build(), tableLocation);
-
+    TableIdentifier tableIdentifier = TableIdentifier.of("db", "files_inheritance_test");
+    Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("id").build());
     table.updateProperties()
         .set(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, "true")
         .commit();
-
-    Table entriesTable = TABLES.load(tableLocation + "#entries");
-    Table filesTable = TABLES.load(tableLocation + "#files");
+    Table entriesTable = loadTable(tableIdentifier, "entries");
+    Table filesTable = loadTable(tableIdentifier, "files");
 
     List<SimpleRecord> records = Lists.newArrayList(
         new SimpleRecord(1, "a"),
@@ -249,11 +248,13 @@ public class TestIcebergSourceHadoopTables {
           .saveAsTable("parquet_table");
 
       String stagingLocation = table.location() + "/metadata";
-      SparkTableUtil.importSparkTable(spark, new TableIdentifier("parquet_table"), table, stagingLocation);
+      SparkTableUtil.importSparkTable(spark,
+          new org.apache.spark.sql.catalyst.TableIdentifier("parquet_table"),
+          table, stagingLocation);
 
       List<Row> actual = spark.read()
           .format("iceberg")
-          .load(tableLocation + "#files")
+          .load(loadLocation(tableIdentifier, "files"))
           .collectAsList();
 
       List<GenericData.Record> expected = Lists.newArrayList();
@@ -270,17 +271,18 @@ public class TestIcebergSourceHadoopTables {
       Assert.assertEquals("Actual results should have one row", 2, actual.size());
       TestHelpers.assertEqualsSafe(filesTable.schema().asStruct(), expected.get(0), actual.get(0));
       TestHelpers.assertEqualsSafe(filesTable.schema().asStruct(), expected.get(1), actual.get(1));
-
     } finally {
       spark.sql("DROP TABLE parquet_table");
     }
+
   }
 
   @Test
   public void testFilesUnpartitionedTable() throws Exception {
-    Table table = TABLES.create(SCHEMA, tableLocation);
-    Table entriesTable = TABLES.load(tableLocation + "#entries");
-    Table filesTable = TABLES.load(tableLocation + "#files");
+    TableIdentifier tableIdentifier = TableIdentifier.of("db", "unpartitioned_files_test");
+    Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());
+    Table entriesTable = loadTable(tableIdentifier, "entries");
+    Table filesTable = loadTable(tableIdentifier, "files");
 
     Dataset<Row> df1 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class);
     Dataset<Row> df2 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(2, "b")), SimpleRecord.class);
@@ -288,7 +290,7 @@ public class TestIcebergSourceHadoopTables {
     df1.select("id", "data").write()
         .format("iceberg")
         .mode("append")
-        .save(tableLocation);
+        .save(loadLocation(tableIdentifier));
 
     table.refresh();
     DataFile toDelete = Iterables.getOnlyElement(table.currentSnapshot().addedFiles());
@@ -297,14 +299,14 @@ public class TestIcebergSourceHadoopTables {
     df2.select("id", "data").write()
         .format("iceberg")
         .mode("append")
-        .save(tableLocation);
+        .save(loadLocation(tableIdentifier));
 
     // delete the first file to test that only live files are listed
     table.newDelete().deleteFile(toDelete).commit();
 
     List<Row> actual = spark.read()
         .format("iceberg")
-        .load(tableLocation + "#files")
+        .load(loadLocation(tableIdentifier, "files"))
         .collectAsList();
 
     List<GenericData.Record> expected = Lists.newArrayList();
@@ -326,9 +328,10 @@ public class TestIcebergSourceHadoopTables {
 
   @Test
   public void testAllDataFilesTable() throws Exception {
-    Table table = TABLES.create(SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("id").build(), tableLocation);
-    Table entriesTable = TABLES.load(tableLocation + "#entries");
-    Table filesTable = TABLES.load(tableLocation + "#all_data_files");
+    TableIdentifier tableIdentifier = TableIdentifier.of("db", "files_test");
+    Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("id").build());
+    Table entriesTable = loadTable(tableIdentifier, "entries");
+    Table filesTable = loadTable(tableIdentifier, "all_data_files");
 
     Dataset<Row> df1 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class);
     Dataset<Row> df2 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(2, "b")), SimpleRecord.class);
@@ -336,7 +339,7 @@ public class TestIcebergSourceHadoopTables {
     df1.select("id", "data").write()
         .format("iceberg")
         .mode("append")
-        .save(tableLocation);
+        .save(loadLocation(tableIdentifier));
 
     // delete the first file to test that not only live files are listed
     table.newDelete().deleteFromRowFilter(Expressions.equal("id", 1)).commit();
@@ -345,14 +348,14 @@ public class TestIcebergSourceHadoopTables {
     df2.select("id", "data").write()
         .format("iceberg")
         .mode("append")
-        .save(tableLocation);
+        .save(loadLocation(tableIdentifier));
 
     // ensure table data isn't stale
     table.refresh();
 
     List<Row> actual = spark.read()
         .format("iceberg")
-        .load(tableLocation + "#all_data_files")
+        .load(loadLocation(tableIdentifier, "all_data_files"))
         .orderBy("file_path")
         .collectAsList();
     actual.sort(Comparator.comparing(o -> o.getString(0)));
@@ -380,8 +383,9 @@ public class TestIcebergSourceHadoopTables {
 
   @Test
   public void testHistoryTable() {
-    Table table = TABLES.create(SCHEMA, tableLocation);
-    Table historyTable = TABLES.load(tableLocation + "#history");
+    TableIdentifier tableIdentifier = TableIdentifier.of("db", "history_test");
+    Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());
+    Table historyTable = loadTable(tableIdentifier, "history");
 
     List<SimpleRecord> records = Lists.newArrayList(new SimpleRecord(1, "1"));
     Dataset<Row> inputDf = spark.createDataFrame(records, SimpleRecord.class);
@@ -389,7 +393,7 @@ public class TestIcebergSourceHadoopTables {
     inputDf.select("id", "data").write()
         .format("iceberg")
         .mode("append")
-        .save(tableLocation);
+        .save(loadLocation(tableIdentifier));
 
     table.refresh();
     long firstSnapshotTimestamp = table.currentSnapshot().timestampMillis();
@@ -398,7 +402,7 @@ public class TestIcebergSourceHadoopTables {
     inputDf.select("id", "data").write()
         .format("iceberg")
         .mode("append")
-        .save(tableLocation);
+        .save(loadLocation(tableIdentifier));
 
     table.refresh();
     long secondSnapshotTimestamp = table.currentSnapshot().timestampMillis();
@@ -411,7 +415,7 @@ public class TestIcebergSourceHadoopTables {
     inputDf.select("id", "data").write()
         .format("iceberg")
         .mode("append")
-        .save(tableLocation);
+        .save(loadLocation(tableIdentifier));
 
     table.refresh();
     long thirdSnapshotTimestamp = table.currentSnapshot().timestampMillis();
@@ -419,7 +423,7 @@ public class TestIcebergSourceHadoopTables {
 
     List<Row> actual = spark.read()
         .format("iceberg")
-        .load(tableLocation + "#history")
+        .load(loadLocation(tableIdentifier, "history"))
         .collectAsList();
 
     GenericRecordBuilder builder = new GenericRecordBuilder(AvroSchemaUtil.convert(historyTable.schema(), "history"));
@@ -454,8 +458,9 @@ public class TestIcebergSourceHadoopTables {
 
   @Test
   public void testSnapshotsTable() {
-    Table table = TABLES.create(SCHEMA, tableLocation);
-    Table snapTable = TABLES.load(tableLocation + "#snapshots");
+    TableIdentifier tableIdentifier = TableIdentifier.of("db", "snapshots_test");
+    Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());
+    Table snapTable = loadTable(tableIdentifier, "snapshots");
 
     List<SimpleRecord> records = Lists.newArrayList(new SimpleRecord(1, "1"));
     Dataset<Row> inputDf = spark.createDataFrame(records, SimpleRecord.class);
@@ -463,7 +468,7 @@ public class TestIcebergSourceHadoopTables {
     inputDf.select("id", "data").write()
         .format("iceberg")
         .mode("append")
-        .save(tableLocation);
+        .save(loadLocation(tableIdentifier));
 
     table.refresh();
     long firstSnapshotTimestamp = table.currentSnapshot().timestampMillis();
@@ -481,7 +486,7 @@ public class TestIcebergSourceHadoopTables {
 
     List<Row> actual = spark.read()
         .format("iceberg")
-        .load(tableLocation + "#snapshots")
+        .load(loadLocation(tableIdentifier, "snapshots"))
         .collectAsList();
 
     GenericRecordBuilder builder = new GenericRecordBuilder(AvroSchemaUtil.convert(snapTable.schema(), "snapshots"));
@@ -521,19 +526,19 @@ public class TestIcebergSourceHadoopTables {
 
   @Test
   public void testManifestsTable() {
-    Table table = TABLES.create(SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("id").build(), tableLocation);
-    Table manifestTable = TABLES.load(tableLocation + "#manifests");
-
+    TableIdentifier tableIdentifier = TableIdentifier.of("db", "manifests_test");
+    Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("id").build());
+    Table manifestTable = loadTable(tableIdentifier, "manifests");
     Dataset<Row> df1 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class);
 
     df1.select("id", "data").write()
         .format("iceberg")
         .mode("append")
-        .save(tableLocation);
+        .save(loadLocation(tableIdentifier));
 
     List<Row> actual = spark.read()
         .format("iceberg")
-        .load(tableLocation + "#manifests")
+        .load(loadLocation(tableIdentifier, "manifests"))
         .collectAsList();
 
     table.refresh();
@@ -566,9 +571,9 @@ public class TestIcebergSourceHadoopTables {
 
   @Test
   public void testAllManifestsTable() {
-    Table table = TABLES.create(SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("id").build(), tableLocation);
-    Table manifestTable = TABLES.load(tableLocation + "#all_manifests");
-
+    TableIdentifier tableIdentifier = TableIdentifier.of("db", "manifests_test");
+    Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("id").build());
+    Table manifestTable = loadTable(tableIdentifier, "all_manifests");
     Dataset<Row> df1 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class);
 
     List<ManifestFile> manifests = Lists.newArrayList();
@@ -576,7 +581,7 @@ public class TestIcebergSourceHadoopTables {
     df1.select("id", "data").write()
         .format("iceberg")
         .mode("append")
-        .save(tableLocation);
+        .save(loadLocation(tableIdentifier));
 
     manifests.addAll(table.currentSnapshot().manifests());
 
@@ -586,7 +591,7 @@ public class TestIcebergSourceHadoopTables {
 
     List<Row> actual = spark.read()
         .format("iceberg")
-        .load(tableLocation + "#all_manifests")
+        .load(loadLocation(tableIdentifier, "all_manifests"))
         .orderBy("path")
         .collectAsList();
 
@@ -624,16 +629,16 @@ public class TestIcebergSourceHadoopTables {
 
   @Test
   public void testPartitionsTable() {
-    Table table = TABLES.create(SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("id").build(), tableLocation);
-    Table partitionsTable = TABLES.load(tableLocation + "#partitions");
-
+    TableIdentifier tableIdentifier = TableIdentifier.of("db", "partitions_test");
+    Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("id").build());
+    Table partitionsTable = loadTable(tableIdentifier, "partitions");
     Dataset<Row> df1 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class);
     Dataset<Row> df2 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(2, "b")), SimpleRecord.class);
 
     df1.select("id", "data").write()
         .format("iceberg")
         .mode("append")
-        .save(tableLocation);
+        .save(loadLocation(tableIdentifier));
 
     table.refresh();
     long firstCommitId = table.currentSnapshot().snapshotId();
@@ -642,11 +647,11 @@ public class TestIcebergSourceHadoopTables {
     df2.select("id", "data").write()
         .format("iceberg")
         .mode("append")
-        .save(tableLocation);
+        .save(loadLocation(tableIdentifier));
 
     List<Row> actual = spark.read()
         .format("iceberg")
-        .load(tableLocation + "#partitions")
+        .load(loadLocation(tableIdentifier, "partitions"))
         .orderBy("partition.id")
         .collectAsList();
 
@@ -676,7 +681,7 @@ public class TestIcebergSourceHadoopTables {
     List<Row> actualAfterFirstCommit = spark.read()
         .format("iceberg")
         .option("snapshot-id", String.valueOf(firstCommitId))
-        .load(tableLocation + "#partitions")
+        .load(loadLocation(tableIdentifier, "partitions"))
         .orderBy("partition.id")
         .collectAsList();