You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/05/24 14:58:45 UTC

[iceberg] branch 0.13.x updated: Core: Backport filter pushdown fix for metadata tables with evolved specs to 0.13 (#4520) (#4569)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch 0.13.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/0.13.x by this push:
     new 5b3e462324 Core: Backport filter pushdown fix for metadata tables with evolved specs to 0.13 (#4520) (#4569)
5b3e462324 is described below

commit 5b3e46232446ef9c1fab719d4c331347fe44fb68
Author: Szehon Ho <sz...@gmail.com>
AuthorDate: Tue May 24 07:58:39 2022 -0700

    Core: Backport filter pushdown fix for metadata tables with evolved specs to 0.13 (#4520) (#4569)
---
 .../java/org/apache/iceberg/BaseMetadataTable.java |  10 +-
 .../java/org/apache/iceberg/DataFilesTable.java    |  22 ++-
 .../java/org/apache/iceberg/PartitionsTable.java   |   2 +-
 .../org/apache/iceberg/TestMetadataTableScans.java | 204 +++++++++++++++++++++
 .../TestMetadataTablesWithPartitionEvolution.java  | 148 +++++++++++++--
 5 files changed, 358 insertions(+), 28 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
index d9b2f797a4..f4966352ec 100644
--- a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
+++ b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
@@ -35,7 +35,6 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
  * deserialization.
  */
 abstract class BaseMetadataTable implements Table, HasTableOperations, Serializable {
-  protected static final String PARTITION_FIELD_PREFIX = "partition.";
   private final PartitionSpec spec = PartitionSpec.unpartitioned();
   private final SortOrder sortOrder = SortOrder.unsorted();
   private final TableOperations ops;
@@ -52,18 +51,17 @@ abstract class BaseMetadataTable implements Table, HasTableOperations, Serializa
    * This method transforms the table's partition spec to a spec that is used to rewrite the user-provided filter
    * expression against the given metadata table.
    * <p>
-   * The resulting partition spec maps $partitionPrefix.X fields to partition X using an identity partition transform.
+   * The resulting partition spec maps partition.X fields to partition X using an identity partition transform.
    * When this spec is used to project an expression for the given metadata table, the projection will remove
-   * predicates for non-partition fields (not in the spec) and will remove the "$partitionPrefix." prefix from fields.
+   * predicates for non-partition fields (not in the spec) and will remove the "partition." prefix from fields.
    *
    * @param metadataTableSchema schema of the metadata table
    * @param spec spec on which the metadata table schema is based
-   * @param partitionPrefix prefix to remove from each field in the partition spec
    * @return a spec used to rewrite the metadata table filters to partition filters using an inclusive projection
    */
-  static PartitionSpec transformSpec(Schema metadataTableSchema, PartitionSpec spec, String partitionPrefix) {
+  static PartitionSpec transformSpec(Schema metadataTableSchema, PartitionSpec spec) {
     PartitionSpec.Builder identitySpecBuilder = PartitionSpec.builderFor(metadataTableSchema).checkConflicts(false);
-    spec.fields().forEach(pf -> identitySpecBuilder.identity(partitionPrefix + pf.name(), pf.name()));
+    spec.fields().forEach(pf -> identitySpecBuilder.add(pf.fieldId(), pf.name(), "identity"));
     return identitySpecBuilder.build();
   }
 
diff --git a/core/src/main/java/org/apache/iceberg/DataFilesTable.java b/core/src/main/java/org/apache/iceberg/DataFilesTable.java
index 31c0bea319..c20332b770 100644
--- a/core/src/main/java/org/apache/iceberg/DataFilesTable.java
+++ b/core/src/main/java/org/apache/iceberg/DataFilesTable.java
@@ -19,10 +19,12 @@
 
 package org.apache.iceberg;
 
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import java.util.Map;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.expressions.ManifestEvaluator;
-import org.apache.iceberg.expressions.Projections;
 import org.apache.iceberg.expressions.ResidualEvaluator;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileIO;
@@ -108,16 +110,16 @@ public class DataFilesTable extends BaseMetadataTable {
       Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter;
       ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);
 
-      // use an inclusive projection to remove the partition name prefix and filter out any non-partition expressions
-      Expression partitionFilter = Projections
-          .inclusive(
-              transformSpec(fileSchema, table().spec(), PARTITION_FIELD_PREFIX),
-              caseSensitive)
-          .project(rowFilter);
+      Map<Integer, PartitionSpec> specsById = table().specs();
 
-      ManifestEvaluator manifestEval = ManifestEvaluator.forPartitionFilter(
-          partitionFilter, table().spec(), caseSensitive);
-      CloseableIterable<ManifestFile> filtered = CloseableIterable.filter(manifests, manifestEval::eval);
+      LoadingCache<Integer, ManifestEvaluator> evalCache = Caffeine.newBuilder().build(specId -> {
+        PartitionSpec spec = specsById.get(specId);
+        PartitionSpec transformedSpec = transformSpec(fileSchema, spec);
+        return ManifestEvaluator.forRowFilter(rowFilter, transformedSpec, caseSensitive);
+      });
+
+      CloseableIterable<ManifestFile> filtered = CloseableIterable.filter(manifests,
+          manifest -> evalCache.get(manifest.partitionSpecId()).eval(manifest));
 
       // Data tasks produce the table schema, not the projection schema and projection is done by processing engines.
       // This data task needs to use the table schema, which may not include a partition schema to avoid having an
diff --git a/core/src/main/java/org/apache/iceberg/PartitionsTable.java b/core/src/main/java/org/apache/iceberg/PartitionsTable.java
index cc9f716a4d..dbf1d39d11 100644
--- a/core/src/main/java/org/apache/iceberg/PartitionsTable.java
+++ b/core/src/main/java/org/apache/iceberg/PartitionsTable.java
@@ -111,7 +111,7 @@ public class PartitionsTable extends BaseMetadataTable {
 
     // use an inclusive projection to remove the partition name prefix and filter out any non-partition expressions
     Expression partitionFilter = Projections
-        .inclusive(transformSpec(scan.schema(), table.spec(), PARTITION_FIELD_PREFIX), caseSensitive)
+        .inclusive(transformSpec(scan.schema(), table.spec()), caseSensitive)
         .project(scan.filter());
 
     ManifestGroup manifestGroup = new ManifestGroup(table.io(), snapshot.dataManifests(), snapshot.deleteManifests())
diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
index af99014df6..3bc4c43860 100644
--- a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
+++ b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
@@ -34,6 +34,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Streams;
 import org.apache.iceberg.types.Conversions;
 import org.apache.iceberg.types.Types;
 import org.junit.Assert;
+import org.junit.Assume;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -713,6 +714,209 @@ public class TestMetadataTableScans extends TableTestBase {
     validateIncludesPartitionScan(tasksAndEq, 0);
   }
 
+  @Test
+  public void testFilesTablePartitionFieldRemovalV1() {
+    Assume.assumeTrue(formatVersion == 1);
+    preparePartitionedTable();
+
+    // Change spec and add two data files
+    table.updateSpec()
+        .removeField(Expressions.bucket("data", 16))
+        .addField("id")
+        .commit();
+    PartitionSpec newSpec = table.spec();
+
+    // Add two data files with new spec
+    PartitionKey data10Key = new PartitionKey(newSpec, table.schema());
+    data10Key.set(1, 10);
+    DataFile data10 = DataFiles.builder(newSpec)
+        .withPath("/path/to/data-10.parquet")
+        .withRecordCount(10)
+        .withFileSizeInBytes(10)
+        .withPartition(data10Key)
+        .build();
+    PartitionKey data11Key = new PartitionKey(newSpec, table.schema());
+    data10Key.set(1, 11);
+    DataFile data11 = DataFiles.builder(newSpec)
+        .withPath("/path/to/data-11.parquet")
+        .withRecordCount(10)
+        .withFileSizeInBytes(10)
+        .withPartition(data11Key)
+        .build();
+
+    table.newFastAppend().appendFile(data10).commit();
+    table.newFastAppend().appendFile(data11).commit();
+
+    Table metadataTable = new DataFilesTable(table.ops(), table);
+    Expression filter = Expressions.and(
+        Expressions.equal("partition.id", 10),
+        Expressions.greaterThan("record_count", 0));
+    TableScan scan = metadataTable.newScan().filter(filter);
+    CloseableIterable<FileScanTask> tasks = scan.planFiles();
+
+    // All 4 original data files written by old spec, plus one data file written by new spec
+    Assert.assertEquals(5, Iterables.size(tasks));
+
+    filter = Expressions.and(
+        Expressions.equal("partition.data_bucket", 0),
+        Expressions.greaterThan("record_count", 0));
+    scan = metadataTable.newScan().filter(filter);
+    tasks = scan.planFiles();
+
+    // 1 original data file written by old spec (V1 filters out new specs which don't have this value)
+    Assert.assertEquals(1, Iterables.size(tasks));
+  }
+
+  @Test
+  public void testFilesTablePartitionFieldRemovalV2() {
+    Assume.assumeTrue(formatVersion == 2);
+    preparePartitionedTable();
+
+    // Change spec and add two data and delete files each
+    table.updateSpec()
+        .removeField(Expressions.bucket("data", 16))
+        .addField("id").commit();
+    PartitionSpec newSpec = table.spec();
+
+    // Add two data files and two delete files with new spec
+    DataFile data10 = DataFiles.builder(newSpec)
+        .withPath("/path/to/data-10.parquet")
+        .withRecordCount(10)
+        .withFileSizeInBytes(10)
+        .withPartitionPath("id=10")
+        .build();
+    DataFile data11 = DataFiles.builder(newSpec)
+        .withPath("/path/to/data-11.parquet")
+        .withRecordCount(10)
+        .withFileSizeInBytes(10)
+        .withPartitionPath("id=11")
+        .build();
+
+    table.newFastAppend().appendFile(data10).commit();
+    table.newFastAppend().appendFile(data11).commit();
+
+    Table metadataTable = new DataFilesTable(table.ops(), table);
+    Expression filter = Expressions.and(
+        Expressions.equal("partition.id", 10),
+        Expressions.greaterThan("record_count", 0));
+    TableScan scan = metadataTable.newScan().filter(filter);
+    CloseableIterable<FileScanTask> tasks = scan.planFiles();
+
+    // All 4 original data files written by old spec, plus one new data file written by new spec
+    Assert.assertEquals(5, Iterables.size(tasks));
+
+    filter = Expressions.and(
+        Expressions.equal("partition.data_bucket", 0),
+        Expressions.greaterThan("record_count", 0));
+    scan = metadataTable.newScan().filter(filter);
+    tasks = scan.planFiles();
+
+    // 1 original data files written by old spec, plus both of new data file written by new spec
+    Assert.assertEquals(3, Iterables.size(tasks));
+  }
+
+  @Test
+  public void testFilesTablePartitionFieldAddV1() {
+    Assume.assumeTrue(formatVersion == 1);
+    preparePartitionedTable();
+
+    // Change spec and add two data files
+    table.updateSpec()
+        .addField("id")
+        .commit();
+    PartitionSpec newSpec = table.spec();
+
+    // Add two data files with new spec
+    PartitionKey data10Key = new PartitionKey(newSpec, table.schema());
+    data10Key.set(0, 0); // data=0
+    data10Key.set(1, 10); // id=10
+    DataFile data10 = DataFiles.builder(newSpec)
+        .withPath("/path/to/data-10.parquet")
+        .withRecordCount(10)
+        .withFileSizeInBytes(10)
+        .withPartition(data10Key)
+        .build();
+    PartitionKey data11Key = new PartitionKey(newSpec, table.schema());
+    data11Key.set(0, 1); // data=0
+    data10Key.set(1, 11); // id=11
+    DataFile data11 = DataFiles.builder(newSpec)
+        .withPath("/path/to/data-11.parquet")
+        .withRecordCount(10)
+        .withFileSizeInBytes(10)
+        .withPartition(data11Key)
+        .build();
+
+    table.newFastAppend().appendFile(data10).commit();
+    table.newFastAppend().appendFile(data11).commit();
+
+    Table metadataTable = new DataFilesTable(table.ops(), table);
+    Expression filter = Expressions.and(
+        Expressions.equal("partition.id", 10),
+        Expressions.greaterThan("record_count", 0));
+    TableScan scan = metadataTable.newScan().filter(filter);
+    CloseableIterable<FileScanTask> tasks = scan.planFiles();
+
+    // All 4 original data files written by old spec, plus one new data file written by new spec
+    Assert.assertEquals(5, Iterables.size(tasks));
+
+    filter = Expressions.and(
+        Expressions.equal("partition.data_bucket", 0),
+        Expressions.greaterThan("record_count", 0));
+    scan = metadataTable.newScan().filter(filter);
+    tasks = scan.planFiles();
+
+    // 1 original data file written by old spec, plus 1 new data file written by new spec
+    Assert.assertEquals(2, Iterables.size(tasks));
+  }
+
+  @Test
+  public void testPartitionSpecEvolutionAdditiveV2() {
+    Assume.assumeTrue(formatVersion == 2);
+    preparePartitionedTable();
+
+    // Change spec and add two data and delete files each
+    table.updateSpec()
+        .addField("id")
+        .commit();
+    PartitionSpec newSpec = table.spec();
+
+    // Add two data files and two delete files with new spec
+    DataFile data10 = DataFiles.builder(newSpec)
+        .withPath("/path/to/data-10.parquet")
+        .withRecordCount(10)
+        .withFileSizeInBytes(10)
+        .withPartitionPath("data_bucket=0/id=10")
+        .build();
+    DataFile data11 = DataFiles.builder(newSpec)
+        .withPath("/path/to/data-11.parquet")
+        .withRecordCount(10)
+        .withFileSizeInBytes(10)
+        .withPartitionPath("data_bucket=1/id=11")
+        .build();
+
+    table.newFastAppend().appendFile(data10).commit();
+    table.newFastAppend().appendFile(data11).commit();
+
+    Table metadataTable = new DataFilesTable(table.ops(), table);
+    Expression filter = Expressions.and(
+        Expressions.equal("partition.id", 10),
+        Expressions.greaterThan("record_count", 0));
+    TableScan scan = metadataTable.newScan().filter(filter);
+    CloseableIterable<FileScanTask> tasks = scan.planFiles();
+
+    // All 4 original data files written by old spec, plus one new data file written by new spec
+    Assert.assertEquals(5, Iterables.size(tasks));
+
+    filter = Expressions.and(
+        Expressions.equal("partition.data_bucket", 0),
+        Expressions.greaterThan("record_count", 0));
+    scan = metadataTable.newScan().filter(filter);
+    tasks = scan.planFiles();
+
+    // 1 original data files written by old spec, plus 1 of new data file written by new spec
+    Assert.assertEquals(2, Iterables.size(tasks));
+  }
+
   private void validateTaskScanResiduals(TableScan scan, boolean ignoreResiduals) throws IOException {
     try (CloseableIterable<CombinedScanTask> tasks = scan.planTasks()) {
       Assert.assertTrue("Tasks should not be empty", Iterables.size(tasks) > 0);
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTablesWithPartitionEvolution.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTablesWithPartitionEvolution.java
index 0e690bd27d..a93fef0bad 100644
--- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTablesWithPartitionEvolution.java
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTablesWithPartitionEvolution.java
@@ -22,8 +22,6 @@ package org.apache.iceberg.spark.source;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.ThreadLocalRandom;
 import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.HasTableOperations;
@@ -74,14 +72,40 @@ public class TestMetadataTablesWithPartitionEvolution extends SparkCatalogTestBa
                 "default-namespace", "default"
             ),
             ORC,
-            formatVersion()
+            1
+        },
+        { "testhive", SparkCatalog.class.getName(),
+            ImmutableMap.of(
+                "type", "hive",
+                "default-namespace", "default"
+            ),
+            ORC,
+            2
+        },
+        { "testhadoop", SparkCatalog.class.getName(),
+            ImmutableMap.of(
+                "type", "hadoop"
+            ),
+            PARQUET,
+            1
         },
         { "testhadoop", SparkCatalog.class.getName(),
             ImmutableMap.of(
                 "type", "hadoop"
             ),
             PARQUET,
-            formatVersion()
+            2
+        },
+        { "spark_catalog", SparkSessionCatalog.class.getName(),
+            ImmutableMap.of(
+                "type", "hive",
+                "default-namespace", "default",
+                "clients", "1",
+                "parquet-enabled", "false",
+                "cache-enabled", "false" // Spark will delete tables using v1, leaving the cache out of sync
+            ),
+            AVRO,
+            1
         },
         { "spark_catalog", SparkSessionCatalog.class.getName(),
             ImmutableMap.of(
@@ -92,17 +116,11 @@ public class TestMetadataTablesWithPartitionEvolution extends SparkCatalogTestBa
                 "cache-enabled", "false" // Spark will delete tables using v1, leaving the cache out of sync
             ),
             AVRO,
-            formatVersion()
+            2
         }
     };
   }
 
-  private static int formatVersion() {
-    return RANDOM.nextInt(2) + 1;
-  }
-
-  private static final Random RANDOM = ThreadLocalRandom.current();
-
   private final FileFormat fileFormat;
   private final int formatVersion;
 
@@ -189,6 +207,106 @@ public class TestMetadataTablesWithPartitionEvolution extends SparkCatalogTestBa
     }
   }
 
+  @Test
+  public void testFilesMetadataTableFilter() throws ParseException {
+    sql("CREATE TABLE %s (id bigint NOT NULL, category string, data string) USING iceberg " +
+        "TBLPROPERTIES ('commit.manifest-merge.enabled' 'false')", tableName);
+    initTable();
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'c1', 'd1')", tableName);
+    sql("INSERT INTO TABLE %s VALUES (2, 'c2', 'd2')", tableName);
+
+    // verify the metadata tables while the current spec is still unpartitioned
+    for (MetadataTableType tableType : Arrays.asList(FILES, ALL_DATA_FILES)) {
+      Dataset<Row> df = loadMetadataTable(tableType);
+      Assert.assertTrue("Partition must be skipped", df.schema().getFieldIndex("partition").isEmpty());
+    }
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    table.updateSpec()
+        .addField("data")
+        .commit();
+    sql("REFRESH TABLE %s", tableName);
+    sql("INSERT INTO TABLE %s VALUES (1, 'c1', 'd1')", tableName);
+    sql("INSERT INTO TABLE %s VALUES (2, 'c2', 'd2')", tableName);
+
+    // verify the metadata tables after adding the first partition column
+    for (MetadataTableType tableType : Arrays.asList(FILES, ALL_DATA_FILES)) {
+      assertPartitions(
+          ImmutableList.of(row("d2")),
+          "STRUCT<data:STRING>",
+          tableType,
+          "partition.data = 'd2'");
+    }
+
+    table.updateSpec()
+        .addField("category")
+        .commit();
+    sql("REFRESH TABLE %s", tableName);
+    sql("INSERT INTO TABLE %s VALUES (1, 'c1', 'd1')", tableName);
+    sql("INSERT INTO TABLE %s VALUES (2, 'c2', 'd2')", tableName);
+
+    // verify the metadata tables after adding the second partition column
+    for (MetadataTableType tableType : Arrays.asList(FILES, ALL_DATA_FILES)) {
+      assertPartitions(ImmutableList.of(row("d2", null), row("d2", "c2")),
+          "STRUCT<data:STRING,category:STRING>",
+          tableType,
+          "partition.data = 'd2'");
+    }
+    for (MetadataTableType tableType : Arrays.asList(FILES, ALL_DATA_FILES)) {
+      assertPartitions(
+          ImmutableList.of(row("d2", "c2")),
+          "STRUCT<data:STRING,category:STRING>",
+          tableType,
+          "partition.category = 'c2'");
+    }
+
+    table.updateSpec()
+        .removeField("data")
+        .commit();
+    sql("REFRESH TABLE %s", tableName);
+
+    // Verify new partitions do not show up for removed 'partition.data=d2' query
+    sql("INSERT INTO TABLE %s VALUES (3, 'c3', 'd2')", tableName);
+    sql("INSERT INTO TABLE %s VALUES (4, 'c4', 'd2')", tableName);
+
+    // Verify new partitions do show up for 'partition.category=c2' query
+    sql("INSERT INTO TABLE %s VALUES (5, 'c2', 'd5')", tableName);
+
+    // no new partition should show up for 'data' partition query as partition field has been removed
+    for (MetadataTableType tableType : Arrays.asList(FILES, ALL_DATA_FILES)) {
+      assertPartitions(
+          ImmutableList.of(row("d2", null), row("d2", "c2")),
+          "STRUCT<data:STRING,category:STRING>",
+          tableType,
+          "partition.data = 'd2'");
+    }
+    // new partition shows up from 'category' partition field query
+    for (MetadataTableType tableType : Arrays.asList(FILES, ALL_DATA_FILES)) {
+      assertPartitions(
+          ImmutableList.of(row(null, "c2"), row("d2", "c2")),
+          "STRUCT<data:STRING,category:STRING>",
+          tableType,
+          "partition.category = 'c2'");
+    }
+
+    table.updateSpec()
+        .renameField("category", "category_another_name")
+        .commit();
+    sql("REFRESH TABLE %s", tableName);
+
+    // Verify new partitions do show up for 'category=c2' query
+    sql("INSERT INTO TABLE %s VALUES (6, 'c2', 'd6')", tableName);
+    for (MetadataTableType tableType : Arrays.asList(FILES, ALL_DATA_FILES)) {
+      assertPartitions(
+          ImmutableList.of(row(null, "c2"), row(null, "c2"), row("d2", "c2")),
+          "STRUCT<data:STRING,category_another_name:STRING>",
+          tableType,
+          "partition.category_another_name = 'c2'");
+    }
+  }
+
   @Test
   public void testEntriesMetadataTable() throws ParseException {
     sql("CREATE TABLE %s (id bigint NOT NULL, category string, data string) USING iceberg", tableName);
@@ -299,7 +417,15 @@ public class TestMetadataTablesWithPartitionEvolution extends SparkCatalogTestBa
 
   private void assertPartitions(List<Object[]> expectedPartitions, String expectedTypeAsString,
                                 MetadataTableType tableType) throws ParseException {
+    assertPartitions(expectedPartitions, expectedTypeAsString, tableType, null);
+  }
+
+  private void assertPartitions(List<Object[]> expectedPartitions, String expectedTypeAsString,
+                                MetadataTableType tableType, String filter) throws ParseException {
     Dataset<Row> df = loadMetadataTable(tableType);
+    if (filter != null) {
+      df = df.filter(filter);
+    }
 
     DataType expectedType = spark.sessionState().sqlParser().parseDataType(expectedTypeAsString);
     switch (tableType) {