You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ru...@apache.org on 2021/04/15 00:03:52 UTC

[iceberg] branch master updated: [Build] Fix Antlr Shadowing and add Integration Tests (#2428)

This is an automated email from the ASF dual-hosted git repository.

russellspitzer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 1884bee  [Build] Fix Antlr Shadowing and add Integration Tests (#2428)
1884bee is described below

commit 1884beeec2e4d48c8627c320a4612320f716af7f
Author: Russell Spitzer <ru...@GMAIL.COM>
AuthorDate: Wed Apr 14 19:03:38 2021 -0500

    [Build] Fix Antlr Shadowing and add Integration Tests (#2428)
    
    * Reverts Antlr Shadowing and adds Integration Test to Check Proper Shading
    
    Previously we had no test suites which checked if the Spark3Runtime jar was actually
    usable with Spark3 builds. To check that the shadowJar is doing the right thing we
    add a new integration test which runs using only Spark3 and the ShadowJar. Only a
    few tests are included to prove that the jar is stable while we still rely on our
    unit tests for the majority of test coverage.
    
    * Shades Antlr Runtime
    
    This reintroduces the shading of Antlr along with a set of integration tests to verify that it is correctly
    shaded and working as expected. To accomplish this we copy several utility classes from Apache Spark so we
    can break our dependency on Spark Internal's accessing Antlr classes.
---
 .github/workflows/java-ci.yml                      |   2 +-
 build.gradle                                       |  33 ++++-
 .../IcebergSparkSqlExtensionsParser.scala          | 115 +++++++++++++--
 .../IcebergSqlExtensionsAstBuilder.scala           |  33 ++++-
 .../spark/extensions/TestCallStatementParser.java  |   3 +-
 .../java/org/apache/iceberg/spark/SmokeTest.java   | 160 +++++++++++++++++++++
 6 files changed, 330 insertions(+), 16 deletions(-)

diff --git a/.github/workflows/java-ci.yml b/.github/workflows/java-ci.yml
index 6911135..8b38dfb 100644
--- a/.github/workflows/java-ci.yml
+++ b/.github/workflows/java-ci.yml
@@ -64,7 +64,7 @@ jobs:
     - uses: actions/setup-java@v1
       with:
         java-version: 8
-    - run: ./gradlew build -x test -x javadoc
+    - run: ./gradlew build -x test -x javadoc -x integrationTest
 
   build-javadoc:
     runs-on: ubuntu-latest
diff --git a/build.gradle b/build.gradle
index 3f37e89..e2a95d4 100644
--- a/build.gradle
+++ b/build.gradle
@@ -985,6 +985,7 @@ project(":iceberg-spark3-extensions") {
     testCompile project(path: ':iceberg-spark3', configuration: 'testArtifacts')
 
     // Required because we remove antlr plugin dependencies from the compile configuration, see note above
+    // We shade this in Spark3 Runtime to avoid issues with Spark's Antlr Runtime
     runtime "org.antlr:antlr4-runtime:4.7.1"
     antlr "org.antlr:antlr4:4.7.1"
   }
@@ -1000,6 +1001,13 @@ project(':iceberg-spark3-runtime') {
 
   tasks.jar.dependsOn tasks.shadowJar
 
+  sourceSets {
+    integration {
+      java.srcDir "$projectDir/src/integration/java"
+      resources.srcDir "$projectDir/src/integration/resources"
+    }
+  }
+
   configurations {
     compile {
       exclude group: 'org.apache.spark'
@@ -1021,6 +1029,18 @@ project(':iceberg-spark3-runtime') {
     compile(project(':iceberg-nessie')) {
       exclude group: 'com.google.code.findbugs', module: 'jsr305'
     }
+
+    integrationImplementation 'org.apache.spark:spark-hive_2.12'
+    integrationImplementation 'junit:junit'
+    integrationImplementation 'org.slf4j:slf4j-simple'
+    integrationImplementation project(path: ':iceberg-api', configuration: 'testArtifacts')
+    integrationImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts')
+    integrationImplementation project(path: ':iceberg-spark', configuration: 'testArtifacts')
+    integrationImplementation project(path: ':iceberg-spark3', configuration: 'testArtifacts')
+    integrationImplementation project(path: ':iceberg-spark3-extensions', configuration: 'testArtifacts')
+    // Not allowed on our classpath, only the runtime jar is allowed
+    integrationCompileOnly project(':iceberg-spark3-extensions')
+    integrationCompileOnly project(':iceberg-spark3')
   }
 
   shadowJar {
@@ -1038,7 +1058,6 @@ project(':iceberg-spark3-runtime') {
     relocate 'com.google', 'org.apache.iceberg.shaded.com.google'
     relocate 'com.fasterxml', 'org.apache.iceberg.shaded.com.fasterxml'
     relocate 'com.github.benmanes', 'org.apache.iceberg.shaded.com.github.benmanes'
-    relocate 'org.antlr.v4.runtime', 'org.apache.iceberg.shaded.org.antlr.v4.runtime'
     relocate 'org.checkerframework', 'org.apache.iceberg.shaded.org.checkerframework'
     relocate 'org.apache.avro', 'org.apache.iceberg.shaded.org.apache.avro'
     relocate 'avro.shaded', 'org.apache.iceberg.shaded.org.apache.avro.shaded'
@@ -1054,10 +1073,22 @@ project(':iceberg-spark3-runtime') {
     relocate 'org.apache.arrow', 'org.apache.iceberg.shaded.org.apache.arrow'
     relocate 'com.carrotsearch', 'org.apache.iceberg.shaded.com.carrotsearch'
     relocate 'org.threeten.extra', 'org.apache.iceberg.shaded.org.threeten.extra'
+    // relocate Antlr runtime and related deps to shade Iceberg specific version
+    relocate 'org.antlr.v4.runtime', 'org.apache.iceberg.shaded.org.antlr.v4.runtime'
 
     classifier null
   }
 
+  task integrationTest(type: Test) {
+    description = "Test Spark3 Runtime Jar"
+    group = "verification"
+    testClassesDirs = sourceSets.integration.output.classesDirs
+    classpath = sourceSets.integration.runtimeClasspath + files(shadowJar.archiveFile.get().asFile.path)
+    inputs.file(shadowJar.archiveFile.get().asFile.path)
+  }
+  integrationTest.dependsOn shadowJar
+  check.dependsOn integrationTest
+
   jar {
     enabled = false
   }
diff --git a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala
index c19401a..d99177f 100644
--- a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala
+++ b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala
@@ -22,16 +22,14 @@ package org.apache.spark.sql.catalyst.parser.extensions
 import java.util.Locale
 import org.antlr.v4.runtime._
 import org.antlr.v4.runtime.atn.PredictionMode
+import org.antlr.v4.runtime.misc.Interval
 import org.antlr.v4.runtime.misc.ParseCancellationException
 import org.antlr.v4.runtime.tree.TerminalNodeImpl
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.FunctionIdentifier
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.catalyst.parser.ParseErrorListener
-import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.parser.UpperCaseCharStream
 import org.apache.spark.sql.catalyst.parser.extensions.IcebergSqlExtensionsParser.NonReservedContext
 import org.apache.spark.sql.catalyst.parser.extensions.IcebergSqlExtensionsParser.QuotedIdentifierContext
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -124,13 +122,13 @@ class IcebergSparkSqlExtensionsParser(delegate: ParserInterface) extends ParserI
   protected def parse[T](command: String)(toResult: IcebergSqlExtensionsParser => T): T = {
     val lexer = new IcebergSqlExtensionsLexer(new UpperCaseCharStream(CharStreams.fromString(command)))
     lexer.removeErrorListeners()
-    lexer.addErrorListener(ParseErrorListener)
+    lexer.addErrorListener(IcebergParseErrorListener)
 
     val tokenStream = new CommonTokenStream(lexer)
     val parser = new IcebergSqlExtensionsParser(tokenStream)
     parser.addParseListener(IcebergSqlExtensionsPostProcessor)
     parser.removeErrorListeners()
-    parser.addErrorListener(ParseErrorListener)
+    parser.addErrorListener(IcebergParseErrorListener)
 
     try {
       try {
@@ -150,22 +148,52 @@ class IcebergSparkSqlExtensionsParser(delegate: ParserInterface) extends ParserI
       }
     }
     catch {
-      case e: ParseException if e.command.isDefined =>
+      case e: IcebergParseException if e.command.isDefined =>
         throw e
-      case e: ParseException =>
+      case e: IcebergParseException =>
         throw e.withCommand(command)
       case e: AnalysisException =>
         val position = Origin(e.line, e.startPosition)
-        throw new ParseException(Option(command), e.message, position, position)
+        throw new IcebergParseException(Option(command), e.message, position, position)
     }
   }
 }
 
+/* Copied from Apache Spark's to avoid dependency on Spark Internals */
+class UpperCaseCharStream(wrapped: CodePointCharStream) extends CharStream {
+  override def consume(): Unit = wrapped.consume
+  override def getSourceName(): String = wrapped.getSourceName
+  override def index(): Int = wrapped.index
+  override def mark(): Int = wrapped.mark
+  override def release(marker: Int): Unit = wrapped.release(marker)
+  override def seek(where: Int): Unit = wrapped.seek(where)
+  override def size(): Int = wrapped.size
+
+  override def getText(interval: Interval): String = {
+    // ANTLR 4.7's CodePointCharStream implementations have bugs when
+    // getText() is called with an empty stream, or intervals where
+    // the start > end. See
+    // https://github.com/antlr/antlr4/commit/ac9f7530 for one fix
+    // that is not yet in a released ANTLR artifact.
+    if (size() > 0 && (interval.b - interval.a >= 0)) {
+      wrapped.getText(interval)
+    } else {
+      ""
+    }
+  }
+
+  // scalastyle:off
+  override def LA(i: Int): Int = {
+    val la = wrapped.LA(i)
+    if (la == 0 || la == IntStream.EOF) la
+    else Character.toUpperCase(la)
+  }
+  // scalastyle:on
+}
+
 /**
  * The post-processor validates & cleans-up the parse tree during the parse process.
  */
-// while we reuse ParseErrorListener and ParseException, we have to copy and modify PostProcessor
-// as it directly depends on classes generated from the extensions grammar file
 case object IcebergSqlExtensionsPostProcessor extends IcebergSqlExtensionsBaseListener {
 
   /** Remove the back ticks from an Identifier. */
@@ -198,3 +226,70 @@ case object IcebergSqlExtensionsPostProcessor extends IcebergSqlExtensionsBaseLi
     parent.addChild(new TerminalNodeImpl(f(newToken)))
   }
 }
+
+/* Partially copied from Apache Spark's Parser to avoid dependency on Spark Internals */
+case object IcebergParseErrorListener extends BaseErrorListener {
+  override def syntaxError(
+      recognizer: Recognizer[_, _],
+      offendingSymbol: scala.Any,
+      line: Int,
+      charPositionInLine: Int,
+      msg: String,
+      e: RecognitionException): Unit = {
+    val (start, stop) = offendingSymbol match {
+      case token: CommonToken =>
+        val start = Origin(Some(line), Some(token.getCharPositionInLine))
+        val length = token.getStopIndex - token.getStartIndex + 1
+        val stop = Origin(Some(line), Some(token.getCharPositionInLine + length))
+        (start, stop)
+      case _ =>
+        val start = Origin(Some(line), Some(charPositionInLine))
+        (start, start)
+    }
+    throw new IcebergParseException(None, msg, start, stop)
+  }
+}
+
+/**
+ * Copied from Apache Spark
+ * A [[ParseException]] is an [[AnalysisException]] that is thrown during the parse process. It
+ * contains fields and an extended error message that make reporting and diagnosing errors easier.
+ */
+class IcebergParseException(
+    val command: Option[String],
+    message: String,
+    val start: Origin,
+    val stop: Origin) extends AnalysisException(message, start.line, start.startPosition) {
+
+  def this(message: String, ctx: ParserRuleContext) = {
+    this(Option(IcebergParserUtils.command(ctx)),
+      message,
+      IcebergParserUtils.position(ctx.getStart),
+      IcebergParserUtils.position(ctx.getStop))
+  }
+
+  override def getMessage: String = {
+    val builder = new StringBuilder
+    builder ++= "\n" ++= message
+    start match {
+      case Origin(Some(l), Some(p)) =>
+        builder ++= s"(line $l, pos $p)\n"
+        command.foreach { cmd =>
+          val (above, below) = cmd.split("\n").splitAt(l)
+          builder ++= "\n== SQL ==\n"
+          above.foreach(builder ++= _ += '\n')
+          builder ++= (0 until p).map(_ => "-").mkString("") ++= "^^^\n"
+          below.foreach(builder ++= _ += '\n')
+        }
+      case _ =>
+        command.foreach { cmd =>
+          builder ++= "\n== SQL ==\n" ++= cmd
+        }
+    }
+    builder.toString
+  }
+
+  def withCommand(cmd: String): IcebergParseException = {
+    new IcebergParseException(Option(cmd), message, start, stop)
+  }
+}
diff --git a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala
index 3d7a50e..8ecd1f0 100644
--- a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala
+++ b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala
@@ -20,6 +20,7 @@
 package org.apache.spark.sql.catalyst.parser.extensions
 
 import org.antlr.v4.runtime._
+import org.antlr.v4.runtime.misc.Interval
 import org.antlr.v4.runtime.tree.ParseTree
 import org.antlr.v4.runtime.tree.TerminalNode
 import org.apache.iceberg.DistributionMode
@@ -30,9 +31,8 @@ import org.apache.iceberg.spark.Spark3Util
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.expressions.Literal
-import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.parser.ParserUtils._
+import org.apache.spark.sql.catalyst.parser.extensions.IcebergParserUtils.withOrigin
 import org.apache.spark.sql.catalyst.parser.extensions.IcebergSqlExtensionsParser._
 import org.apache.spark.sql.catalyst.plans.logical.AddPartitionField
 import org.apache.spark.sql.catalyst.plans.logical.CallArgument
@@ -43,6 +43,8 @@ import org.apache.spark.sql.catalyst.plans.logical.NamedArgument
 import org.apache.spark.sql.catalyst.plans.logical.PositionalArgument
 import org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField
 import org.apache.spark.sql.catalyst.plans.logical.SetWriteDistributionAndOrdering
+import org.apache.spark.sql.catalyst.trees.CurrentOrigin
+import org.apache.spark.sql.catalyst.trees.Origin
 import org.apache.spark.sql.connector.expressions
 import org.apache.spark.sql.connector.expressions.ApplyTransform
 import org.apache.spark.sql.connector.expressions.FieldReference
@@ -182,7 +184,7 @@ class IcebergSqlExtensionsAstBuilder(delegate: ParserInterface) extends IcebergS
         .map(visitConstant)
         .map(lit => LiteralValue(lit.value, lit.dataType))
     reference.orElse(literal)
-        .getOrElse(throw new ParseException(s"Invalid transform argument", ctx))
+        .getOrElse(throw new IcebergParseException(s"Invalid transform argument", ctx))
   }
 
   /**
@@ -237,3 +239,28 @@ class IcebergSqlExtensionsAstBuilder(delegate: ParserInterface) extends IcebergS
     ctx.accept(this).asInstanceOf[T]
   }
 }
+
+/* Partially copied from Apache Spark's Parser to avoid dependency on Spark Internals */
+object IcebergParserUtils {
+
+  private[sql] def withOrigin[T](ctx: ParserRuleContext)(f: => T): T = {
+    val current = CurrentOrigin.get
+    CurrentOrigin.set(position(ctx.getStart))
+    try {
+      f
+    } finally {
+      CurrentOrigin.set(current)
+    }
+  }
+
+  private[sql] def position(token: Token): Origin = {
+    val opt = Option(token)
+    Origin(opt.map(_.getLine), opt.map(_.getCharPositionInLine))
+  }
+
+  /** Get the command which created the token. */
+  private[sql] def command(ctx: ParserRuleContext): String = {
+    val stream = ctx.getStart.getInputStream
+    stream.getText(Interval.of(0, stream.size() - 1))
+  }
+}
diff --git a/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCallStatementParser.java b/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCallStatementParser.java
index 483b03b..99e9d06 100644
--- a/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCallStatementParser.java
+++ b/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCallStatementParser.java
@@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.expressions.Literal;
 import org.apache.spark.sql.catalyst.expressions.Literal$;
 import org.apache.spark.sql.catalyst.parser.ParseException;
 import org.apache.spark.sql.catalyst.parser.ParserInterface;
+import org.apache.spark.sql.catalyst.parser.extensions.IcebergParseException;
 import org.apache.spark.sql.catalyst.plans.logical.CallArgument;
 import org.apache.spark.sql.catalyst.plans.logical.CallStatement;
 import org.apache.spark.sql.catalyst.plans.logical.NamedArgument;
@@ -131,7 +132,7 @@ public class TestCallStatementParser {
 
   @Test
   public void testCallParseError() {
-    AssertHelpers.assertThrows("Should fail with a sensible parse error", ParseException.class,
+    AssertHelpers.assertThrows("Should fail with a sensible parse error", IcebergParseException.class,
         "missing '(' at 'radish'",
         () -> parser.parsePlan("CALL cat.system radish kebab"));
   }
diff --git a/spark3-runtime/src/integration/java/org/apache/iceberg/spark/SmokeTest.java b/spark3-runtime/src/integration/java/org/apache/iceberg/spark/SmokeTest.java
new file mode 100644
index 0000000..40c8b27
--- /dev/null
+++ b/spark3-runtime/src/integration/java/org/apache/iceberg/spark/SmokeTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.spark.extensions.SparkExtensionsTestBase;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class SmokeTest extends SparkExtensionsTestBase {
+
+  public SmokeTest(String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  @Before
+  public void dropTable() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+  }
+
+  // Run through our Doc's Getting Started Example
+  // TODO Update doc example so that it can actually be run, modifications were required for this test suite to run
+  @Test
+  public void testGettingStarted() throws IOException {
+    // Creating a table
+    sql("CREATE TABLE %s (id bigint, data string) USING iceberg", tableName);
+
+    // Writing
+    sql("INSERT INTO %s VALUES (1, 'a'), (2, 'b'), (3, 'c')", tableName);
+    Assert.assertEquals("Should have inserted 3 rows",
+        3L, scalarSql("SELECT COUNT(*) FROM %s", tableName));
+
+    sql("DROP TABLE IF EXISTS source");
+    sql("CREATE TABLE source (id bigint, data string) USING parquet LOCATION '%s'", temp.newFolder());
+    sql("INSERT INTO source VALUES (10, 'd'), (11, 'ee')");
+
+    sql("INSERT INTO %s SELECT id, data FROM source WHERE length(data) = 1", tableName);
+    Assert.assertEquals("Table should now have 4 rows",
+        4L, scalarSql("SELECT COUNT(*) FROM %s", tableName));
+
+    sql("DROP TABLE IF EXISTS updates");
+    sql("CREATE TABLE updates (id bigint, data string) USING parquet LOCATION '%s'", temp.newFolder());
+    sql("INSERT INTO updates VALUES (1, 'x'), (2, 'x'), (4, 'z')");
+
+    sql("MERGE INTO %s t USING (SELECT * FROM updates) u ON t.id = u.id\n" +
+        "WHEN MATCHED THEN UPDATE SET t.data = u.data\n" +
+        "WHEN NOT MATCHED THEN INSERT *", tableName);
+    Assert.assertEquals("Table should now have 5 rows",
+        5L, scalarSql("SELECT COUNT(*) FROM %s", tableName));
+    Assert.assertEquals("Record 1 should now have data x",
+        "x", scalarSql("SELECT data FROM %s WHERE id = 1", tableName));
+
+    // Reading
+    Assert.assertEquals("There should be 2 records with data x",
+        2L, scalarSql(
+            "SELECT count(1) as count FROM %s WHERE data = 'x' GROUP BY data ", tableName));
+
+    // Not supported because of Spark limitation
+    if (!catalogName.equals("spark_catalog")) {
+      Assert.assertEquals("There should be 3 snapshots",
+          3L, scalarSql("SELECT COUNT(*) FROM %s.snapshots", tableName));
+    }
+  }
+
+  // From Spark DDL Docs section
+  @Test
+  public void testAlterTable() throws NoSuchTableException {
+    sql("CREATE TABLE %s (category int, id bigint, data string, ts timestamp) USING iceberg", tableName);
+    Table table = getTable();
+    // Add examples
+    sql("ALTER TABLE %s ADD PARTITION FIELD bucket(16, id)", tableName);
+    sql("ALTER TABLE %s ADD PARTITION FIELD truncate(data, 4)", tableName);
+    sql("ALTER TABLE %s ADD PARTITION FIELD years(ts)", tableName);
+    sql("ALTER TABLE %s ADD PARTITION FIELD bucket(16, category) AS shard", tableName);
+    table = getTable();
+    Assert.assertEquals("Table should have 4 partition fields", 4, table.spec().fields().size());
+
+    // Drop Examples
+    sql("ALTER TABLE %s DROP PARTITION FIELD bucket(16, id)", tableName);
+    sql("ALTER TABLE %s DROP PARTITION FIELD truncate(data, 4)", tableName);
+    sql("ALTER TABLE %s DROP PARTITION FIELD years(ts)", tableName);
+    sql("ALTER TABLE %s DROP PARTITION FIELD shard", tableName);
+
+    table = getTable();
+    Assert.assertEquals("Table should have 4 partition fields", 4, table.spec().fields().size());
+    // VoidTransform is package private so we can't reach it here, just checking name
+    Assert.assertTrue("All transforms should be void",
+        table.spec().fields().stream().allMatch(pf -> pf.transform().toString().equals("void")));
+
+    // Sort order examples
+    sql("ALTER TABLE %s WRITE ORDERED BY category, id", tableName);
+    sql("ALTER TABLE %s WRITE ORDERED BY category ASC, id DESC", tableName);
+    sql("ALTER TABLE %s WRITE ORDERED BY category ASC NULLS LAST, id DESC NULLS FIRST", tableName);
+    table = getTable();
+    Assert.assertEquals("Table should be sorted on 2 fields", 2, table.sortOrder().fields().size());
+  }
+
+  @Test
+  public void testCreateTable() {
+    sql("DROP TABLE IF EXISTS %s", tableName("first"));
+    sql("DROP TABLE IF EXISTS %s", tableName("second"));
+    sql("DROP TABLE IF EXISTS %s", tableName("third"));
+
+    sql("CREATE TABLE %s (\n" +
+        "    id bigint COMMENT 'unique id',\n" +
+        "    data string)\n" +
+        "USING iceberg", tableName("first"));
+    getTable("first"); // Table should exist
+
+    sql("CREATE TABLE %s (\n" +
+        "    id bigint,\n" +
+        "    data string,\n" +
+        "    category string)\n" +
+        "USING iceberg\n" +
+        "PARTITIONED BY (category)", tableName("second"));
+    Table second = getTable("second");
+    Assert.assertEquals("Should be partitioned on 1 column", 1, second.spec().fields().size());
+
+    sql("CREATE TABLE %s (\n" +
+        "    id bigint,\n" +
+        "    data string,\n" +
+        "    category string,\n" +
+        "    ts timestamp)\n" +
+        "USING iceberg\n" +
+        "PARTITIONED BY (bucket(16, id), days(ts), category)", tableName("third"));
+    Table third = getTable("third");
+    Assert.assertEquals("Should be partitioned on 3 columns", 3, third.spec().fields().size());
+  }
+
+  private Table getTable(String name) {
+    return validationCatalog.loadTable(TableIdentifier.of("default", name));
+  }
+
+  private Table getTable() {
+    return validationCatalog.loadTable(tableIdent);
+  }
+
+}