You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/11/09 10:58:02 UTC

spark git commit: [SPARK-22405][SQL] Add new alter table and alter database related ExternalCatalogEvent

Repository: spark
Updated Branches:
  refs/heads/master 40a8aefaf -> 6793a3dac


[SPARK-22405][SQL] Add new alter table and alter database related ExternalCatalogEvent

## What changes were proposed in this pull request?

We're building a data lineage tool in which we need to monitor the metadata changes in ExternalCatalog, current ExternalCatalog already provides several useful events like "CreateDatabaseEvent" for custom SparkListener to use. But still there's some event missing, like alter database event and alter table event. So here propose to and new ExternalCatalogEvent.

## How was this patch tested?

Enrich the current UT and tested on local cluster.

CC hvanhovell please let me know your comments about current proposal, thanks.

Author: jerryshao <ss...@hortonworks.com>

Closes #19649 from jerryshao/SPARK-22405.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6793a3da
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6793a3da
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6793a3da

Branch: refs/heads/master
Commit: 6793a3dac0a44570625044e1eb30fa578fa2f142
Parents: 40a8aef
Author: jerryshao <ss...@hortonworks.com>
Authored: Thu Nov 9 11:57:56 2017 +0100
Committer: Wenchen Fan <we...@databricks.com>
Committed: Thu Nov 9 11:57:56 2017 +0100

----------------------------------------------------------------------
 .../sql/catalyst/catalog/ExternalCatalog.scala  | 35 +++++++++++++++---
 .../sql/catalyst/catalog/InMemoryCatalog.scala  |  8 ++---
 .../spark/sql/catalyst/catalog/events.scala     | 38 +++++++++++++++++++-
 .../catalog/ExternalCatalogEventSuite.scala     | 22 ++++++++++++
 .../spark/sql/hive/HiveExternalCatalog.scala    | 18 +++++++---
 5 files changed, 107 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6793a3da/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
index 223094d..45b4f01 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
@@ -87,7 +87,14 @@ abstract class ExternalCatalog
    * Note: If the underlying implementation does not support altering a certain field,
    * this becomes a no-op.
    */
-  def alterDatabase(dbDefinition: CatalogDatabase): Unit
+  final def alterDatabase(dbDefinition: CatalogDatabase): Unit = {
+    val db = dbDefinition.name
+    postToAll(AlterDatabasePreEvent(db))
+    doAlterDatabase(dbDefinition)
+    postToAll(AlterDatabaseEvent(db))
+  }
+
+  protected def doAlterDatabase(dbDefinition: CatalogDatabase): Unit
 
   def getDatabase(db: String): CatalogDatabase
 
@@ -147,7 +154,15 @@ abstract class ExternalCatalog
    * Note: If the underlying implementation does not support altering a certain field,
    * this becomes a no-op.
    */
-  def alterTable(tableDefinition: CatalogTable): Unit
+  final def alterTable(tableDefinition: CatalogTable): Unit = {
+    val db = tableDefinition.database
+    val name = tableDefinition.identifier.table
+    postToAll(AlterTablePreEvent(db, name, AlterTableKind.TABLE))
+    doAlterTable(tableDefinition)
+    postToAll(AlterTableEvent(db, name, AlterTableKind.TABLE))
+  }
+
+  protected def doAlterTable(tableDefinition: CatalogTable): Unit
 
   /**
    * Alter the data schema of a table identified by the provided database and table name. The new
@@ -158,10 +173,22 @@ abstract class ExternalCatalog
    * @param table Name of table to alter schema for
    * @param newDataSchema Updated data schema to be used for the table.
    */
-  def alterTableDataSchema(db: String, table: String, newDataSchema: StructType): Unit
+  final def alterTableDataSchema(db: String, table: String, newDataSchema: StructType): Unit = {
+    postToAll(AlterTablePreEvent(db, table, AlterTableKind.DATASCHEMA))
+    doAlterTableDataSchema(db, table, newDataSchema)
+    postToAll(AlterTableEvent(db, table, AlterTableKind.DATASCHEMA))
+  }
+
+  protected def doAlterTableDataSchema(db: String, table: String, newDataSchema: StructType): Unit
 
   /** Alter the statistics of a table. If `stats` is None, then remove all existing statistics. */
-  def alterTableStats(db: String, table: String, stats: Option[CatalogStatistics]): Unit
+  final def alterTableStats(db: String, table: String, stats: Option[CatalogStatistics]): Unit = {
+    postToAll(AlterTablePreEvent(db, table, AlterTableKind.STATS))
+    doAlterTableStats(db, table, stats)
+    postToAll(AlterTableEvent(db, table, AlterTableKind.STATS))
+  }
+
+  protected def doAlterTableStats(db: String, table: String, stats: Option[CatalogStatistics]): Unit
 
   def getTable(db: String, table: String): CatalogTable
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6793a3da/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index 9504140..8eacfa0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -152,7 +152,7 @@ class InMemoryCatalog(
     }
   }
 
-  override def alterDatabase(dbDefinition: CatalogDatabase): Unit = synchronized {
+  override def doAlterDatabase(dbDefinition: CatalogDatabase): Unit = synchronized {
     requireDbExists(dbDefinition.name)
     catalog(dbDefinition.name).db = dbDefinition
   }
@@ -294,7 +294,7 @@ class InMemoryCatalog(
     catalog(db).tables.remove(oldName)
   }
 
-  override def alterTable(tableDefinition: CatalogTable): Unit = synchronized {
+  override def doAlterTable(tableDefinition: CatalogTable): Unit = synchronized {
     assert(tableDefinition.identifier.database.isDefined)
     val db = tableDefinition.identifier.database.get
     requireTableExists(db, tableDefinition.identifier.table)
@@ -303,7 +303,7 @@ class InMemoryCatalog(
     catalog(db).tables(tableDefinition.identifier.table).table = newTableDefinition
   }
 
-  override def alterTableDataSchema(
+  override def doAlterTableDataSchema(
       db: String,
       table: String,
       newDataSchema: StructType): Unit = synchronized {
@@ -313,7 +313,7 @@ class InMemoryCatalog(
     catalog(db).tables(table).table = origTable.copy(schema = newSchema)
   }
 
-  override def alterTableStats(
+  override def doAlterTableStats(
       db: String,
       table: String,
       stats: Option[CatalogStatistics]): Unit = synchronized {

http://git-wip-us.apache.org/repos/asf/spark/blob/6793a3da/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/events.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/events.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/events.scala
index 742a51e..e7d4164 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/events.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/events.scala
@@ -62,6 +62,16 @@ case class DropDatabasePreEvent(database: String) extends DatabaseEvent
 case class DropDatabaseEvent(database: String) extends DatabaseEvent
 
 /**
+ * Event fired before a database is altered.
+ */
+case class AlterDatabasePreEvent(database: String) extends DatabaseEvent
+
+/**
+ * Event fired after a database is altered.
+ */
+case class AlterDatabaseEvent(database: String) extends DatabaseEvent
+
+/**
  * Event fired when a table is created, dropped or renamed.
  */
 trait TableEvent extends DatabaseEvent {
@@ -110,7 +120,33 @@ case class RenameTableEvent(
   extends TableEvent
 
 /**
- * Event fired when a function is created, dropped or renamed.
+ * String to indicate which part of table is altered. If a plain alterTable API is called, then
+ * type will generally be Table.
+ */
+object AlterTableKind extends Enumeration {
+  val TABLE = "table"
+  val DATASCHEMA = "dataSchema"
+  val STATS = "stats"
+}
+
+/**
+ * Event fired before a table is altered.
+ */
+case class AlterTablePreEvent(
+    database: String,
+    name: String,
+    kind: String) extends TableEvent
+
+/**
+ * Event fired after a table is altered.
+ */
+case class AlterTableEvent(
+    database: String,
+    name: String,
+    kind: String) extends TableEvent
+
+/**
+ * Event fired when a function is created, dropped, altered or renamed.
  */
 trait FunctionEvent extends DatabaseEvent {
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/6793a3da/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogEventSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogEventSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogEventSuite.scala
index 087c26f..1acbe34 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogEventSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogEventSuite.scala
@@ -75,6 +75,11 @@ class ExternalCatalogEventSuite extends SparkFunSuite {
     }
     checkEvents(CreateDatabasePreEvent("db5") :: Nil)
 
+    // ALTER
+    val newDbDefinition = dbDefinition.copy(description = "test")
+    catalog.alterDatabase(newDbDefinition)
+    checkEvents(AlterDatabasePreEvent("db5") :: AlterDatabaseEvent("db5") :: Nil)
+
     // DROP
     intercept[AnalysisException] {
       catalog.dropDatabase("db4", ignoreIfNotExists = false, cascade = false)
@@ -119,6 +124,23 @@ class ExternalCatalogEventSuite extends SparkFunSuite {
     }
     checkEvents(CreateTablePreEvent("db5", "tbl1") :: Nil)
 
+    // ALTER
+    val newTableDefinition = tableDefinition.copy(tableType = CatalogTableType.EXTERNAL)
+    catalog.alterTable(newTableDefinition)
+    checkEvents(AlterTablePreEvent("db5", "tbl1", AlterTableKind.TABLE) ::
+      AlterTableEvent("db5", "tbl1", AlterTableKind.TABLE) :: Nil)
+
+    // ALTER schema
+    val newSchema = new StructType().add("id", "long", nullable = false)
+    catalog.alterTableDataSchema("db5", "tbl1", newSchema)
+    checkEvents(AlterTablePreEvent("db5", "tbl1", AlterTableKind.DATASCHEMA) ::
+      AlterTableEvent("db5", "tbl1", AlterTableKind.DATASCHEMA) :: Nil)
+
+    // ALTER stats
+    catalog.alterTableStats("db5", "tbl1", None)
+    checkEvents(AlterTablePreEvent("db5", "tbl1", AlterTableKind.STATS) ::
+      AlterTableEvent("db5", "tbl1", AlterTableKind.STATS) :: Nil)
+
     // RENAME
     catalog.renameTable("db5", "tbl1", "tbl2")
     checkEvents(

http://git-wip-us.apache.org/repos/asf/spark/blob/6793a3da/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index f8a947b..7cd7725 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -177,7 +177,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
    *
    * Note: As of now, this only supports altering database properties!
    */
-  override def alterDatabase(dbDefinition: CatalogDatabase): Unit = withClient {
+  override def doAlterDatabase(dbDefinition: CatalogDatabase): Unit = withClient {
     val existingDb = getDatabase(dbDefinition.name)
     if (existingDb.properties == dbDefinition.properties) {
       logWarning(s"Request to alter database ${dbDefinition.name} is a no-op because " +
@@ -540,7 +540,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
    * Note: As of now, this doesn't support altering table schema, partition column names and bucket
    * specification. We will ignore them even if users do specify different values for these fields.
    */
-  override def alterTable(tableDefinition: CatalogTable): Unit = withClient {
+  override def doAlterTable(tableDefinition: CatalogTable): Unit = withClient {
     assert(tableDefinition.identifier.database.isDefined)
     val db = tableDefinition.identifier.database.get
     requireTableExists(db, tableDefinition.identifier.table)
@@ -619,8 +619,15 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
     }
   }
 
-  override def alterTableDataSchema(
-      db: String, table: String, newDataSchema: StructType): Unit = withClient {
+  /**
+   * Alter the data schema of a table identified by the provided database and table name. The new
+   * data schema should not have conflict column names with the existing partition columns, and
+   * should still contain all the existing data columns.
+   */
+  override def doAlterTableDataSchema(
+      db: String,
+      table: String,
+      newDataSchema: StructType): Unit = withClient {
     requireTableExists(db, table)
     val oldTable = getTable(db, table)
     verifyDataSchema(oldTable.identifier, oldTable.tableType, newDataSchema)
@@ -648,7 +655,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
     }
   }
 
-  override def alterTableStats(
+  /** Alter the statistics of a table. If `stats` is None, then remove all existing statistics. */
+  override def doAlterTableStats(
       db: String,
       table: String,
       stats: Option[CatalogStatistics]): Unit = withClient {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org