You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by am...@apache.org on 2023/05/20 18:30:53 UTC

[iceberg] branch master updated: Spark 3.2: backport Spark SQL extension on create/update/drop tags (#7662)

This is an automated email from the ASF dual-hosted git repository.

amoghj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new d8fc5b3c0a Spark 3.2: backport Spark SQL extension on create/update/drop tags (#7662)
d8fc5b3c0a is described below

commit d8fc5b3c0ae26ee673f2f9007d6e3ff7f7386c38
Author: Hongyue/Steve Zhang <st...@gmail.com>
AuthorDate: Sat May 20 11:30:46 2023 -0700

    Spark 3.2: backport Spark SQL extension on create/update/drop tags (#7662)
    
    * Spark 3.2: backport Spark SQL extension on create/update/drop tags
    ---------
    
    Co-authored-by: Steve Zhang <ho...@apple.com>
---
 .../IcebergSqlExtensions.g4                        |  20 +-
 .../IcebergSparkSqlExtensionsParser.scala          |   5 +-
 .../IcebergSqlExtensionsAstBuilder.scala           |  39 +++
 .../plans/logical/CreateOrReplaceTag.scala         |  38 +++
 .../spark/sql/catalyst/plans/logical/DropTag.scala |  33 ++
 .../sql/catalyst/plans/logical/TagOptions.scala    |  22 ++
 .../datasources/v2/CreateOrReplaceTagExec.scala    |  80 +++++
 .../sql/execution/datasources/v2/DropTagExec.scala |  56 +++
 .../v2/ExtendedDataSourceV2Strategy.scala          |  10 +-
 .../iceberg/spark/extensions/TestTagDDL.java       | 374 +++++++++++++++++++++
 10 files changed, 673 insertions(+), 4 deletions(-)

diff --git a/spark/v3.2/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4 b/spark/v3.2/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4
index e614ee9ed3..7bd556acc5 100644
--- a/spark/v3.2/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4
+++ b/spark/v3.2/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4
@@ -74,7 +74,14 @@ statement
     | ALTER TABLE multipartIdentifier SET IDENTIFIER_KW FIELDS fieldList                    #setIdentifierFields
     | ALTER TABLE multipartIdentifier DROP IDENTIFIER_KW FIELDS fieldList                   #dropIdentifierFields
     | ALTER TABLE multipartIdentifier createReplaceBranchClause                             #createOrReplaceBranch
+    | ALTER TABLE multipartIdentifier createReplaceTagClause                                #createOrReplaceTag
     | ALTER TABLE multipartIdentifier DROP BRANCH (IF EXISTS)? identifier                   #dropBranch
+    | ALTER TABLE multipartIdentifier DROP TAG (IF EXISTS)? identifier                      #dropTag
+    ;
+
+createReplaceTagClause
+    : (CREATE OR)? REPLACE TAG identifier tagOptions
+    | CREATE TAG (IF NOT EXISTS)? identifier tagOptions
     ;
 
 createReplaceBranchClause
@@ -82,6 +89,10 @@ createReplaceBranchClause
     | CREATE BRANCH (IF NOT EXISTS)? identifier branchOptions
     ;
 
+tagOptions
+    : (AS OF VERSION snapshotId)? (refRetain)?
+    ;
+
 branchOptions
     : (AS OF VERSION snapshotId)? (refRetain)? (snapshotRetention)?
     ;
@@ -203,8 +214,8 @@ fieldList
 
 nonReserved
     : ADD | ALTER | AS | ASC | BRANCH | BY | CALL | CREATE | DAYS | DESC | DROP | EXISTS | FIELD | FIRST | HOURS | IF | LAST | NOT | NULLS | OF | OR | ORDERED | PARTITION | TABLE | WRITE
-    | DISTRIBUTED | LOCALLY | MINUTES | MONTHS | UNORDERED | REPLACE | RETAIN | RETENTION | VERSION | WITH | IDENTIFIER_KW | FIELDS | SET | SNAPSHOT | SNAPSHOTS
-    | TRUE | FALSE
+    | DISTRIBUTED | LOCALLY | MINUTES | MONTHS | UNORDERED | REPLACE | RETAIN | VERSION | WITH | IDENTIFIER_KW | FIELDS | SET | SNAPSHOT | SNAPSHOTS
+    | TAG | TRUE | FALSE
     | MAP
     ;
 
@@ -212,6 +223,10 @@ snapshotId
     : number
     ;
 
+numSnapshots
+    : number
+    ;
+
 timeUnit
     : DAYS
     | HOURS
@@ -254,6 +269,7 @@ SET: 'SET';
 SNAPSHOT: 'SNAPSHOT';
 SNAPSHOTS: 'SNAPSHOTS';
 TABLE: 'TABLE';
+TAG: 'TAG';
 UNORDERED: 'UNORDERED';
 VERSION: 'VERSION';
 WITH: 'WITH';
diff --git a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala
index 9b71ec8f5c..2996ceb366 100644
--- a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala
+++ b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala
@@ -212,7 +212,10 @@ class IcebergSparkSqlExtensionsParser(delegate: ParserInterface) extends ParserI
   private def isSnapshotRefDdl(normalized: String): Boolean = {
     normalized.contains("create branch") ||
       normalized.contains("replace branch") ||
-      normalized.contains("drop branch")
+      normalized.contains("create tag") ||
+      normalized.contains("replace tag") ||
+      normalized.contains("drop branch") ||
+      normalized.contains("drop tag")
   }
 
   protected def parse[T](command: String)(toResult: IcebergSqlExtensionsParser => T): T = {
diff --git a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala
index 09144514ab..f758cb08fd 100644
--- a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala
+++ b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala
@@ -41,15 +41,18 @@ import org.apache.spark.sql.catalyst.plans.logical.BranchOptions
 import org.apache.spark.sql.catalyst.plans.logical.CallArgument
 import org.apache.spark.sql.catalyst.plans.logical.CallStatement
 import org.apache.spark.sql.catalyst.plans.logical.CreateOrReplaceBranch
+import org.apache.spark.sql.catalyst.plans.logical.CreateOrReplaceTag
 import org.apache.spark.sql.catalyst.plans.logical.DropBranch
 import org.apache.spark.sql.catalyst.plans.logical.DropIdentifierFields
 import org.apache.spark.sql.catalyst.plans.logical.DropPartitionField
+import org.apache.spark.sql.catalyst.plans.logical.DropTag
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.plans.logical.NamedArgument
 import org.apache.spark.sql.catalyst.plans.logical.PositionalArgument
 import org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField
 import org.apache.spark.sql.catalyst.plans.logical.SetIdentifierFields
 import org.apache.spark.sql.catalyst.plans.logical.SetWriteDistributionAndOrdering
+import org.apache.spark.sql.catalyst.plans.logical.TagOptions
 import org.apache.spark.sql.catalyst.trees.CurrentOrigin
 import org.apache.spark.sql.catalyst.trees.Origin
 import org.apache.spark.sql.connector.expressions
@@ -131,6 +134,35 @@ class IcebergSqlExtensionsAstBuilder(delegate: ParserInterface) extends IcebergS
       ifNotExists)
   }
 
+  /**
+   * Create an CREATE OR REPLACE TAG logical command.
+   */
+  override def visitCreateOrReplaceTag(ctx: CreateOrReplaceTagContext): CreateOrReplaceTag = withOrigin(ctx) {
+    val createTagClause = ctx.createReplaceTagClause()
+
+    val tagName = createTagClause.identifier().getText
+
+    val tagOptionsContext = Option(createTagClause.tagOptions())
+    val snapshotId = tagOptionsContext.flatMap(tagOptions => Option(tagOptions.snapshotId()))
+      .map(_.getText.toLong)
+    val tagRetain = tagOptionsContext.flatMap(tagOptions => Option(tagOptions.refRetain()))
+    val tagRefAgeMs = tagRetain.map(retain =>
+      TimeUnit.valueOf(retain.timeUnit().getText.toUpperCase(Locale.ENGLISH)).toMillis(retain.number().getText.toLong))
+    val tagOptions = TagOptions(
+      snapshotId,
+      tagRefAgeMs
+    )
+
+    val replace = createTagClause.REPLACE() != null
+    val ifNotExists = createTagClause.EXISTS() != null
+
+    CreateOrReplaceTag(typedVisit[Seq[String]](ctx.multipartIdentifier),
+      tagName,
+      tagOptions,
+      replace,
+      ifNotExists)
+  }
+
   /**
    * Create an DROP BRANCH logical command.
    */
@@ -138,6 +170,13 @@ class IcebergSqlExtensionsAstBuilder(delegate: ParserInterface) extends IcebergS
     DropBranch(typedVisit[Seq[String]](ctx.multipartIdentifier), ctx.identifier().getText, ctx.EXISTS() != null)
   }
 
+  /**
+   * Create an DROP TAG logical command.
+   */
+  override def visitDropTag(ctx: DropTagContext): DropTag = withOrigin(ctx) {
+    DropTag(typedVisit[Seq[String]](ctx.multipartIdentifier), ctx.identifier().getText, ctx.EXISTS() != null)
+  }
+
   /**
    * Create an REPLACE PARTITION FIELD logical command.
    */
diff --git a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CreateOrReplaceTag.scala b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CreateOrReplaceTag.scala
new file mode 100644
index 0000000000..e48f7d8ed0
--- /dev/null
+++ b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CreateOrReplaceTag.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.Attribute
+
+case class CreateOrReplaceTag(
+    table: Seq[String],
+    tag: String,
+    tagOptions: TagOptions,
+    replace: Boolean,
+    ifNotExists: Boolean) extends LeafCommand {
+
+  import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+
+  override lazy val output: Seq[Attribute] = Nil
+
+  override def simpleString(maxFields: Int): String = {
+    s"CreateOrReplaceTag tag: ${tag} for table: ${table.quoted}"
+  }
+}
diff --git a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DropTag.scala b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DropTag.scala
new file mode 100644
index 0000000000..7e4b38e74d
--- /dev/null
+++ b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DropTag.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.Attribute
+
+case class DropTag(table: Seq[String], tag: String, ifExists: Boolean) extends LeafCommand {
+
+  import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+
+  override lazy val output: Seq[Attribute] = Nil
+
+  override def simpleString(maxFields: Int): String = {
+    s"DropTag tag: ${tag} for table: ${table.quoted}"
+  }
+}
diff --git a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TagOptions.scala b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TagOptions.scala
new file mode 100644
index 0000000000..85e3b95f4a
--- /dev/null
+++ b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TagOptions.scala
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+case class TagOptions(snapshotId: Option[Long], snapshotRefRetain: Option[Long])
diff --git a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceTagExec.scala b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceTagExec.scala
new file mode 100644
index 0000000000..7ca193d1b1
--- /dev/null
+++ b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceTagExec.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions
+import org.apache.iceberg.spark.source.SparkTable
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.TagOptions
+import org.apache.spark.sql.connector.catalog._
+
+case class CreateOrReplaceTagExec(
+    catalog: TableCatalog,
+    ident: Identifier,
+    tag: String,
+    tagOptions: TagOptions,
+    replace: Boolean,
+    ifNotExists: Boolean) extends LeafV2CommandExec {
+
+  import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+
+  override lazy val output: Seq[Attribute] = Nil
+
+  override protected def run(): Seq[InternalRow] = {
+    catalog.loadTable(ident) match {
+      case iceberg: SparkTable =>
+        val snapshotId: java.lang.Long = tagOptions.snapshotId
+          .orElse(Option(iceberg.table.currentSnapshot()).map(_.snapshotId()))
+          .map(java.lang.Long.valueOf)
+          .orNull
+
+        Preconditions.checkArgument(snapshotId != null,
+          "Cannot complete create or replace tag operation on %s, main has no snapshot", ident)
+
+        val manageSnapshot = iceberg.table.manageSnapshots()
+        if (!replace) {
+          val ref = iceberg.table().refs().get(tag)
+          if (ref != null && ifNotExists) {
+            return Nil
+          }
+
+          manageSnapshot.createTag(tag, snapshotId)
+        } else {
+          manageSnapshot.replaceTag(tag, snapshotId)
+        }
+
+        if (tagOptions.snapshotRefRetain.nonEmpty) {
+          manageSnapshot.setMaxRefAgeMs(tag, tagOptions.snapshotRefRetain.get)
+        }
+
+        manageSnapshot.commit()
+
+      case table =>
+        throw new UnsupportedOperationException(s"Cannot create tag to non-Iceberg table: $table")
+    }
+
+    Nil
+  }
+
+  override def simpleString(maxFields: Int): String = {
+    s"Create tag: $tag for table: ${ident.quoted}"
+  }
+}
diff --git a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTagExec.scala b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTagExec.scala
new file mode 100644
index 0000000000..0a1c17c0b1
--- /dev/null
+++ b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTagExec.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.iceberg.spark.source.SparkTable
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.connector.catalog.Identifier
+import org.apache.spark.sql.connector.catalog.TableCatalog
+
+case class DropTagExec(
+                        catalog: TableCatalog,
+                        ident: Identifier,
+                        tag: String,
+                        ifExists: Boolean) extends LeafV2CommandExec {
+
+  import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+
+  override lazy val output: Seq[Attribute] = Nil
+
+  override protected def run(): Seq[InternalRow] = {
+    catalog.loadTable(ident) match {
+      case iceberg: SparkTable =>
+        val ref = iceberg.table().refs().get(tag)
+        if (ref != null || !ifExists) {
+          iceberg.table().manageSnapshots().removeTag(tag).commit()
+        }
+
+      case table =>
+        throw new UnsupportedOperationException(s"Cannot drop tag on non-Iceberg table: $table")
+    }
+
+    Nil
+  }
+
+  override def simpleString(maxFields: Int): String = {
+    s"DropTag tag: ${tag} for table: ${ident.quoted}"
+  }
+}
diff --git a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala
index 19d5fbedbf..326574bf25 100644
--- a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala
+++ b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala
@@ -32,10 +32,12 @@ import org.apache.spark.sql.catalyst.expressions.PredicateHelper
 import org.apache.spark.sql.catalyst.plans.logical.AddPartitionField
 import org.apache.spark.sql.catalyst.plans.logical.Call
 import org.apache.spark.sql.catalyst.plans.logical.CreateOrReplaceBranch
+import org.apache.spark.sql.catalyst.plans.logical.CreateOrReplaceTag
 import org.apache.spark.sql.catalyst.plans.logical.DeleteFromIcebergTable
 import org.apache.spark.sql.catalyst.plans.logical.DropBranch
 import org.apache.spark.sql.catalyst.plans.logical.DropIdentifierFields
 import org.apache.spark.sql.catalyst.plans.logical.DropPartitionField
+import org.apache.spark.sql.catalyst.plans.logical.DropTag
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.plans.logical.MergeRows
 import org.apache.spark.sql.catalyst.plans.logical.NoStatsUnaryNode
@@ -70,12 +72,18 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi
       ReplacePartitionFieldExec(catalog, ident, transformFrom, transformTo, name) :: Nil
 
     case CreateOrReplaceBranch(
-    IcebergCatalogAndIdentifier(catalog, ident), branch, branchOptions, replace, ifNotExists) =>
+        IcebergCatalogAndIdentifier(catalog, ident), branch, branchOptions, replace, ifNotExists) =>
       CreateOrReplaceBranchExec(catalog, ident, branch, branchOptions, replace, ifNotExists) :: Nil
 
+    case CreateOrReplaceTag(IcebergCatalogAndIdentifier(catalog, ident), tag, tagOptions, replace, ifNotExists) =>
+      CreateOrReplaceTagExec(catalog, ident, tag, tagOptions, replace, ifNotExists) :: Nil
+
     case DropBranch(IcebergCatalogAndIdentifier(catalog, ident), branch, ifExists) =>
       DropBranchExec(catalog, ident, branch, ifExists) :: Nil
 
+    case DropTag(IcebergCatalogAndIdentifier(catalog, ident), tag, ifExists) =>
+      DropTagExec(catalog, ident, tag, ifExists) :: Nil
+
     case SetIdentifierFields(IcebergCatalogAndIdentifier(catalog, ident), fields) =>
       SetIdentifierFieldsExec(catalog, ident, fields) :: Nil
 
diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestTagDDL.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestTagDDL.java
new file mode 100644
index 0000000000..ec3148de6c
--- /dev/null
+++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestTagDDL.java
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.extensions;
+
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.SnapshotRef;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.spark.SparkCatalogConfig;
+import org.apache.iceberg.spark.source.SimpleRecord;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.parser.extensions.IcebergParseException;
+import org.assertj.core.api.Assertions;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runners.Parameterized;
+
+public class TestTagDDL extends SparkExtensionsTestBase {
+  private static final String[] TIME_UNITS = {"DAYS", "HOURS", "MINUTES"};
+
+  @Parameterized.Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+      {
+        SparkCatalogConfig.SPARK.catalogName(),
+        SparkCatalogConfig.SPARK.implementation(),
+        SparkCatalogConfig.SPARK.properties()
+      }
+    };
+  }
+
+  public TestTagDDL(String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  @Before
+  public void before() {
+    sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);
+  }
+
+  @After
+  public void removeTable() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+  }
+
+  @Test
+  public void testCreateTagWithRetain() throws NoSuchTableException {
+    Table table = insertRows();
+    long firstSnapshotId = table.currentSnapshot().snapshotId();
+    long maxRefAge = 10L;
+
+    List<SimpleRecord> records =
+        ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b"));
+    Dataset<Row> df = spark.createDataFrame(records, SimpleRecord.class);
+    df.writeTo(tableName).append();
+
+    for (String timeUnit : TIME_UNITS) {
+      String tagName = "t1" + timeUnit;
+      sql(
+          "ALTER TABLE %s CREATE TAG %s AS OF VERSION %d RETAIN %d %s",
+          tableName, tagName, firstSnapshotId, maxRefAge, timeUnit);
+      table.refresh();
+      SnapshotRef ref = table.refs().get(tagName);
+      Assert.assertEquals(
+          "The tag needs to point to a specific snapshot id.", firstSnapshotId, ref.snapshotId());
+      Assert.assertEquals(
+          "The tag needs to have the correct max ref age.",
+          TimeUnit.valueOf(timeUnit.toUpperCase(Locale.ENGLISH)).toMillis(maxRefAge),
+          ref.maxRefAgeMs().longValue());
+    }
+
+    String tagName = "t1";
+    AssertHelpers.assertThrows(
+        "Illegal statement",
+        IcebergParseException.class,
+        "mismatched input",
+        () ->
+            sql(
+                "ALTER TABLE %s CREATE TAG %s AS OF VERSION %d RETAIN",
+                tableName, tagName, firstSnapshotId, maxRefAge));
+
+    AssertHelpers.assertThrows(
+        "Illegal statement",
+        IcebergParseException.class,
+        "mismatched input",
+        () -> sql("ALTER TABLE %s CREATE TAG %s RETAIN %s DAYS", tableName, tagName, "abc"));
+
+    AssertHelpers.assertThrows(
+        "Illegal statement",
+        IcebergParseException.class,
+        "mismatched input 'SECONDS' expecting {'DAYS', 'HOURS', 'MINUTES'}",
+        () ->
+            sql(
+                "ALTER TABLE %s CREATE TAG %s AS OF VERSION %d RETAIN %d SECONDS",
+                tableName, tagName, firstSnapshotId, maxRefAge));
+  }
+
+  @Test
+  public void testCreateTagOnEmptyTable() {
+    Assertions.assertThatThrownBy(() -> sql("ALTER TABLE %s CREATE TAG %s", tableName, "abc"))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining(
+            "Cannot complete create or replace tag operation on %s, main has no snapshot",
+            tableName);
+  }
+
+  @Test
+  public void testCreateTagUseDefaultConfig() throws NoSuchTableException {
+    Table table = insertRows();
+    long snapshotId = table.currentSnapshot().snapshotId();
+    String tagName = "t1";
+
+    AssertHelpers.assertThrows(
+        "unknown snapshot",
+        ValidationException.class,
+        "unknown snapshot: -1",
+        () -> sql("ALTER TABLE %s CREATE TAG %s AS OF VERSION %d", tableName, tagName, -1));
+
+    sql("ALTER TABLE %s CREATE TAG %s", tableName, tagName);
+    table.refresh();
+    SnapshotRef ref = table.refs().get(tagName);
+    Assert.assertEquals(
+        "The tag needs to point to a specific snapshot id.", snapshotId, ref.snapshotId());
+    Assert.assertNull(
+        "The tag needs to have the default max ref age, which is null.", ref.maxRefAgeMs());
+
+    AssertHelpers.assertThrows(
+        "Cannot create an exist tag",
+        IllegalArgumentException.class,
+        "already exists",
+        () -> sql("ALTER TABLE %s CREATE TAG %s", tableName, tagName));
+
+    AssertHelpers.assertThrows(
+        "Non-conforming tag name",
+        IcebergParseException.class,
+        "mismatched input '123'",
+        () -> sql("ALTER TABLE %s CREATE TAG %s", tableName, "123"));
+
+    table.manageSnapshots().removeTag(tagName).commit();
+    List<SimpleRecord> records =
+        ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b"));
+    Dataset<Row> df = spark.createDataFrame(records, SimpleRecord.class);
+    df.writeTo(tableName).append();
+    snapshotId = table.currentSnapshot().snapshotId();
+    sql("ALTER TABLE %s CREATE TAG %s AS OF VERSION %d", tableName, tagName, snapshotId);
+    table.refresh();
+    ref = table.refs().get(tagName);
+    Assert.assertEquals(
+        "The tag needs to point to a specific snapshot id.", snapshotId, ref.snapshotId());
+    Assert.assertNull(
+        "The tag needs to have the default max ref age, which is null.", ref.maxRefAgeMs());
+  }
+
+  @Test
+  public void testCreateTagIfNotExists() throws NoSuchTableException {
+    long maxSnapshotAge = 2L;
+    Table table = insertRows();
+    String tagName = "t1";
+    sql("ALTER TABLE %s CREATE TAG %s RETAIN %d days", tableName, tagName, maxSnapshotAge);
+    sql("ALTER TABLE %s CREATE TAG IF NOT EXISTS %s", tableName, tagName);
+
+    table.refresh();
+    SnapshotRef ref = table.refs().get(tagName);
+    Assert.assertEquals(
+        "The tag needs to point to a specific snapshot id.",
+        table.currentSnapshot().snapshotId(),
+        ref.snapshotId());
+    Assert.assertEquals(
+        "The tag needs to have the correct max ref age.",
+        TimeUnit.DAYS.toMillis(maxSnapshotAge),
+        ref.maxRefAgeMs().longValue());
+  }
+
+  @Test
+  public void testReplaceTagFailsForBranch() throws NoSuchTableException {
+    String branchName = "branch1";
+    Table table = insertRows();
+    long first = table.currentSnapshot().snapshotId();
+    table.manageSnapshots().createBranch(branchName, first).commit();
+    insertRows();
+    long second = table.currentSnapshot().snapshotId();
+
+    AssertHelpers.assertThrows(
+        "Cannot perform replace tag on branches",
+        IllegalArgumentException.class,
+        "Ref branch1 is a branch not a tag",
+        () -> sql("ALTER TABLE %s REPLACE Tag %s", tableName, branchName, second));
+  }
+
+  @Test
+  public void testReplaceTag() throws NoSuchTableException {
+    Table table = insertRows();
+    long first = table.currentSnapshot().snapshotId();
+    String tagName = "t1";
+    long expectedMaxRefAgeMs = 1000;
+    table
+        .manageSnapshots()
+        .createTag(tagName, first)
+        .setMaxRefAgeMs(tagName, expectedMaxRefAgeMs)
+        .commit();
+
+    insertRows();
+    long second = table.currentSnapshot().snapshotId();
+
+    sql("ALTER TABLE %s REPLACE Tag %s AS OF VERSION %d", tableName, tagName, second);
+    table.refresh();
+    SnapshotRef ref = table.refs().get(tagName);
+    Assert.assertEquals(
+        "The tag needs to point to a specific snapshot id.", second, ref.snapshotId());
+    Assert.assertEquals(
+        "The tag needs to have the correct max ref age.",
+        expectedMaxRefAgeMs,
+        ref.maxRefAgeMs().longValue());
+  }
+
+  @Test
+  public void testReplaceTagDoesNotExist() throws NoSuchTableException {
+    Table table = insertRows();
+
+    AssertHelpers.assertThrows(
+        "Cannot perform replace tag on tag which does not exist",
+        IllegalArgumentException.class,
+        "Tag does not exist",
+        () ->
+            sql(
+                "ALTER TABLE %s REPLACE Tag %s AS OF VERSION %d",
+                tableName, "someTag", table.currentSnapshot().snapshotId()));
+  }
+
+  @Test
+  public void testReplaceTagWithRetain() throws NoSuchTableException {
+    Table table = insertRows();
+    long first = table.currentSnapshot().snapshotId();
+    String tagName = "t1";
+    table.manageSnapshots().createTag(tagName, first).commit();
+    insertRows();
+    long second = table.currentSnapshot().snapshotId();
+
+    long maxRefAge = 10;
+    for (String timeUnit : TIME_UNITS) {
+      sql(
+          "ALTER TABLE %s REPLACE Tag %s AS OF VERSION %d RETAIN %d %s",
+          tableName, tagName, second, maxRefAge, timeUnit);
+
+      table.refresh();
+      SnapshotRef ref = table.refs().get(tagName);
+      Assert.assertEquals(
+          "The tag needs to point to a specific snapshot id.", second, ref.snapshotId());
+      Assert.assertEquals(
+          "The tag needs to have the correct max ref age.",
+          TimeUnit.valueOf(timeUnit).toMillis(maxRefAge),
+          ref.maxRefAgeMs().longValue());
+    }
+  }
+
+  @Test
+  public void testCreateOrReplace() throws NoSuchTableException {
+    Table table = insertRows();
+    long first = table.currentSnapshot().snapshotId();
+    String tagName = "t1";
+    insertRows();
+    long second = table.currentSnapshot().snapshotId();
+    table.manageSnapshots().createTag(tagName, second).commit();
+
+    sql("ALTER TABLE %s CREATE OR REPLACE TAG %s AS OF VERSION %d", tableName, tagName, first);
+    table.refresh();
+    SnapshotRef ref = table.refs().get(tagName);
+    Assert.assertEquals(
+        "The tag needs to point to a specific snapshot id.", first, ref.snapshotId());
+  }
+
+  @Test
+  public void testDropTag() throws NoSuchTableException {
+    insertRows();
+    Table table = validationCatalog.loadTable(tableIdent);
+    String tagName = "t1";
+    table.manageSnapshots().createTag(tagName, table.currentSnapshot().snapshotId()).commit();
+    SnapshotRef ref = table.refs().get(tagName);
+    Assert.assertEquals(
+        "The tag needs to point to a specific snapshot id.",
+        table.currentSnapshot().snapshotId(),
+        ref.snapshotId());
+
+    sql("ALTER TABLE %s DROP TAG %s", tableName, tagName);
+    table.refresh();
+    ref = table.refs().get(tagName);
+    Assert.assertNull("The tag needs to be dropped.", ref);
+  }
+
+  @Test
+  public void testDropTagNonConformingName() {
+    AssertHelpers.assertThrows(
+        "Non-conforming tag name",
+        IcebergParseException.class,
+        "mismatched input '123'",
+        () -> sql("ALTER TABLE %s DROP TAG %s", tableName, "123"));
+  }
+
+  @Test
+  public void testDropTagDoesNotExist() {
+    AssertHelpers.assertThrows(
+        "Cannot perform drop tag on tag which does not exist",
+        IllegalArgumentException.class,
+        "Tag does not exist: nonExistingTag",
+        () -> sql("ALTER TABLE %s DROP TAG %s", tableName, "nonExistingTag"));
+  }
+
+  @Test
+  public void testDropTagFailesForBranch() throws NoSuchTableException {
+    String branchName = "b1";
+    Table table = insertRows();
+    table.manageSnapshots().createBranch(branchName, table.currentSnapshot().snapshotId()).commit();
+
+    AssertHelpers.assertThrows(
+        "Cannot perform drop tag on branch",
+        IllegalArgumentException.class,
+        "Ref b1 is a branch not a tag",
+        () -> sql("ALTER TABLE %s DROP TAG %s", tableName, branchName));
+  }
+
+  @Test
+  public void testDropTagIfExists() throws NoSuchTableException {
+    String tagName = "nonExistingTag";
+    Table table = insertRows();
+    Assert.assertNull("The tag does not exists.", table.refs().get(tagName));
+
+    sql("ALTER TABLE %s DROP TAG IF EXISTS %s", tableName, tagName);
+    table.refresh();
+    Assert.assertNull("The tag still does not exist.", table.refs().get(tagName));
+
+    table.manageSnapshots().createTag(tagName, table.currentSnapshot().snapshotId()).commit();
+    Assert.assertEquals(
+        "The tag has been created successfully.",
+        table.currentSnapshot().snapshotId(),
+        table.refs().get(tagName).snapshotId());
+
+    sql("ALTER TABLE %s DROP TAG IF EXISTS %s", tableName, tagName);
+    table.refresh();
+    Assert.assertNull("The tag needs to be dropped.", table.refs().get(tagName));
+  }
+
+  private Table insertRows() throws NoSuchTableException {
+    List<SimpleRecord> records =
+        ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b"));
+    Dataset<Row> df = spark.createDataFrame(records, SimpleRecord.class);
+    df.writeTo(tableName).append();
+    return validationCatalog.loadTable(tableIdent);
+  }
+}