You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2017/04/21 07:05:05 UTC

spark git commit: [SPARK-20420][SQL] Add events to the external catalog

Repository: spark
Updated Branches:
  refs/heads/master 48d760d02 -> e2b3d2367


[SPARK-20420][SQL] Add events to the external catalog

## What changes were proposed in this pull request?
It is often useful to be able to track changes to the `ExternalCatalog`. This PR makes the `ExternalCatalog` emit events when a catalog object is changed. Events are fired before and after the change.

The following events are fired per object:

- Database
  - CreateDatabasePreEvent: event fired before the database is created.
  - CreateDatabaseEvent: event fired after the database has been created.
  - DropDatabasePreEvent: event fired before the database is dropped.
  - DropDatabaseEvent: event fired after the database has been dropped.
- Table
  - CreateTablePreEvent: event fired before the table is created.
  - CreateTableEvent: event fired after the table has been created.
  - RenameTablePreEvent: event fired before the table is renamed.
  - RenameTableEvent: event fired after the table has been renamed.
  - DropTablePreEvent: event fired before the table is dropped.
  - DropTableEvent: event fired after the table has been dropped.
- Function
  - CreateFunctionPreEvent: event fired before the function is created.
  - CreateFunctionEvent: event fired after the function has been created.
  - RenameFunctionPreEvent: event fired before the function is renamed.
  - RenameFunctionEvent: event fired after the function has been renamed.
  - DropFunctionPreEvent: event fired before the function is dropped.
  - DropFunctionPreEvent: event fired after the function has been dropped.

The current events currently only contain the names of the object modified. We add more events, and more details at a later point.

A user can monitor changes to the external catalog by adding a listener to the Spark listener bus checking for `ExternalCatalogEvent`s using the `SparkListener.onOtherEvent` hook. A more direct approach is add listener directly to the `ExternalCatalog`.

## How was this patch tested?
Added the `ExternalCatalogEventSuite`.

Author: Herman van Hovell <hv...@databricks.com>

Closes #17710 from hvanhovell/SPARK-20420.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e2b3d236
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e2b3d236
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e2b3d236

Branch: refs/heads/master
Commit: e2b3d2367a563d4600d8d87b5317e71135c362f0
Parents: 48d760d
Author: Herman van Hovell <hv...@databricks.com>
Authored: Fri Apr 21 00:05:03 2017 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Fri Apr 21 00:05:03 2017 -0700

----------------------------------------------------------------------
 .../sql/catalyst/catalog/ExternalCatalog.scala  |  85 ++++++++-
 .../sql/catalyst/catalog/InMemoryCatalog.scala  |  22 ++-
 .../spark/sql/catalyst/catalog/events.scala     | 158 ++++++++++++++++
 .../catalog/ExternalCatalogEventSuite.scala     | 188 +++++++++++++++++++
 .../apache/spark/sql/internal/SharedState.scala |   7 +
 .../spark/sql/hive/HiveExternalCatalog.scala    |  22 ++-
 6 files changed, 457 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e2b3d236/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
index 08a01e8..974ef90 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.catalog
 import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, NoSuchDatabaseException, NoSuchFunctionException, NoSuchTableException}
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.ListenerBus
 
 /**
  * Interface for the system catalog (of functions, partitions, tables, and databases).
@@ -30,7 +31,8 @@ import org.apache.spark.sql.types.StructType
  *
  * Implementations should throw [[NoSuchDatabaseException]] when databases don't exist.
  */
-abstract class ExternalCatalog {
+abstract class ExternalCatalog
+  extends ListenerBus[ExternalCatalogEventListener, ExternalCatalogEvent] {
   import CatalogTypes.TablePartitionSpec
 
   protected def requireDbExists(db: String): Unit = {
@@ -61,9 +63,22 @@ abstract class ExternalCatalog {
   // Databases
   // --------------------------------------------------------------------------
 
-  def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit
+  final def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit = {
+    val db = dbDefinition.name
+    postToAll(CreateDatabasePreEvent(db))
+    doCreateDatabase(dbDefinition, ignoreIfExists)
+    postToAll(CreateDatabaseEvent(db))
+  }
+
+  protected def doCreateDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit
+
+  final def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = {
+    postToAll(DropDatabasePreEvent(db))
+    doDropDatabase(db, ignoreIfNotExists, cascade)
+    postToAll(DropDatabaseEvent(db))
+  }
 
-  def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit
+  protected def doDropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit
 
   /**
    * Alter a database whose name matches the one specified in `dbDefinition`,
@@ -88,11 +103,39 @@ abstract class ExternalCatalog {
   // Tables
   // --------------------------------------------------------------------------
 
-  def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit
+  final def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = {
+    val db = tableDefinition.database
+    val name = tableDefinition.identifier.table
+    postToAll(CreateTablePreEvent(db, name))
+    doCreateTable(tableDefinition, ignoreIfExists)
+    postToAll(CreateTableEvent(db, name))
+  }
 
-  def dropTable(db: String, table: String, ignoreIfNotExists: Boolean, purge: Boolean): Unit
+  protected def doCreateTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit
 
-  def renameTable(db: String, oldName: String, newName: String): Unit
+  final def dropTable(
+      db: String,
+      table: String,
+      ignoreIfNotExists: Boolean,
+      purge: Boolean): Unit = {
+    postToAll(DropTablePreEvent(db, table))
+    doDropTable(db, table, ignoreIfNotExists, purge)
+    postToAll(DropTableEvent(db, table))
+  }
+
+  protected def doDropTable(
+      db: String,
+      table: String,
+      ignoreIfNotExists: Boolean,
+      purge: Boolean): Unit
+
+  final def renameTable(db: String, oldName: String, newName: String): Unit = {
+    postToAll(RenameTablePreEvent(db, oldName, newName))
+    doRenameTable(db, oldName, newName)
+    postToAll(RenameTableEvent(db, oldName, newName))
+  }
+
+  protected def doRenameTable(db: String, oldName: String, newName: String): Unit
 
   /**
    * Alter a table whose database and name match the ones specified in `tableDefinition`, assuming
@@ -269,11 +312,30 @@ abstract class ExternalCatalog {
   // Functions
   // --------------------------------------------------------------------------
 
-  def createFunction(db: String, funcDefinition: CatalogFunction): Unit
+  final def createFunction(db: String, funcDefinition: CatalogFunction): Unit = {
+    val name = funcDefinition.identifier.funcName
+    postToAll(CreateFunctionPreEvent(db, name))
+    doCreateFunction(db, funcDefinition)
+    postToAll(CreateFunctionEvent(db, name))
+  }
 
-  def dropFunction(db: String, funcName: String): Unit
+  protected def doCreateFunction(db: String, funcDefinition: CatalogFunction): Unit
 
-  def renameFunction(db: String, oldName: String, newName: String): Unit
+  final def dropFunction(db: String, funcName: String): Unit = {
+    postToAll(DropFunctionPreEvent(db, funcName))
+    doDropFunction(db, funcName)
+    postToAll(DropFunctionEvent(db, funcName))
+  }
+
+  protected def doDropFunction(db: String, funcName: String): Unit
+
+  final def renameFunction(db: String, oldName: String, newName: String): Unit = {
+    postToAll(RenameFunctionPreEvent(db, oldName, newName))
+    doRenameFunction(db, oldName, newName)
+    postToAll(RenameFunctionEvent(db, oldName, newName))
+  }
+
+  protected def doRenameFunction(db: String, oldName: String, newName: String): Unit
 
   def getFunction(db: String, funcName: String): CatalogFunction
 
@@ -281,4 +343,9 @@ abstract class ExternalCatalog {
 
   def listFunctions(db: String, pattern: String): Seq[String]
 
+  override protected def doPostEvent(
+      listener: ExternalCatalogEventListener,
+      event: ExternalCatalogEvent): Unit = {
+    listener.onEvent(event)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/e2b3d236/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index 9ca1c71..81dd8ef 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -98,7 +98,7 @@ class InMemoryCatalog(
   // Databases
   // --------------------------------------------------------------------------
 
-  override def createDatabase(
+  override protected def doCreateDatabase(
       dbDefinition: CatalogDatabase,
       ignoreIfExists: Boolean): Unit = synchronized {
     if (catalog.contains(dbDefinition.name)) {
@@ -119,7 +119,7 @@ class InMemoryCatalog(
     }
   }
 
-  override def dropDatabase(
+  override protected def doDropDatabase(
       db: String,
       ignoreIfNotExists: Boolean,
       cascade: Boolean): Unit = synchronized {
@@ -180,7 +180,7 @@ class InMemoryCatalog(
   // Tables
   // --------------------------------------------------------------------------
 
-  override def createTable(
+  override protected def doCreateTable(
       tableDefinition: CatalogTable,
       ignoreIfExists: Boolean): Unit = synchronized {
     assert(tableDefinition.identifier.database.isDefined)
@@ -221,7 +221,7 @@ class InMemoryCatalog(
     }
   }
 
-  override def dropTable(
+  override protected def doDropTable(
       db: String,
       table: String,
       ignoreIfNotExists: Boolean,
@@ -264,7 +264,10 @@ class InMemoryCatalog(
     }
   }
 
-  override def renameTable(db: String, oldName: String, newName: String): Unit = synchronized {
+  override protected def doRenameTable(
+      db: String,
+      oldName: String,
+      newName: String): Unit = synchronized {
     requireTableExists(db, oldName)
     requireTableNotExists(db, newName)
     val oldDesc = catalog(db).tables(oldName)
@@ -565,18 +568,21 @@ class InMemoryCatalog(
   // Functions
   // --------------------------------------------------------------------------
 
-  override def createFunction(db: String, func: CatalogFunction): Unit = synchronized {
+  override protected def doCreateFunction(db: String, func: CatalogFunction): Unit = synchronized {
     requireDbExists(db)
     requireFunctionNotExists(db, func.identifier.funcName)
     catalog(db).functions.put(func.identifier.funcName, func)
   }
 
-  override def dropFunction(db: String, funcName: String): Unit = synchronized {
+  override protected def doDropFunction(db: String, funcName: String): Unit = synchronized {
     requireFunctionExists(db, funcName)
     catalog(db).functions.remove(funcName)
   }
 
-  override def renameFunction(db: String, oldName: String, newName: String): Unit = synchronized {
+  override protected def doRenameFunction(
+      db: String,
+      oldName: String,
+      newName: String): Unit = synchronized {
     requireFunctionExists(db, oldName)
     requireFunctionNotExists(db, newName)
     val newFunc = getFunction(db, oldName).copy(identifier = FunctionIdentifier(newName, Some(db)))

http://git-wip-us.apache.org/repos/asf/spark/blob/e2b3d236/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/events.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/events.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/events.scala
new file mode 100644
index 0000000..459973a
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/events.scala
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.catalog
+
+import org.apache.spark.scheduler.SparkListenerEvent
+
+/**
+ * Event emitted by the external catalog when it is modified. Events are either fired before or
+ * after the modification (the event should document this).
+ */
+trait ExternalCatalogEvent extends SparkListenerEvent
+
+/**
+ * Listener interface for external catalog modification events.
+ */
+trait ExternalCatalogEventListener {
+  def onEvent(event: ExternalCatalogEvent): Unit
+}
+
+/**
+ * Event fired when a database is create or dropped.
+ */
+trait DatabaseEvent extends ExternalCatalogEvent {
+  /**
+   * Database of the object that was touched.
+   */
+  val database: String
+}
+
+/**
+ * Event fired before a database is created.
+ */
+case class CreateDatabasePreEvent(database: String) extends DatabaseEvent
+
+/**
+ * Event fired after a database has been created.
+ */
+case class CreateDatabaseEvent(database: String) extends DatabaseEvent
+
+/**
+ * Event fired before a database is dropped.
+ */
+case class DropDatabasePreEvent(database: String) extends DatabaseEvent
+
+/**
+ * Event fired after a database has been dropped.
+ */
+case class DropDatabaseEvent(database: String) extends DatabaseEvent
+
+/**
+ * Event fired when a table is created, dropped or renamed.
+ */
+trait TableEvent extends DatabaseEvent {
+  /**
+   * Name of the table that was touched.
+   */
+  val name: String
+}
+
+/**
+ * Event fired before a table is created.
+ */
+case class CreateTablePreEvent(database: String, name: String) extends TableEvent
+
+/**
+ * Event fired after a table has been created.
+ */
+case class CreateTableEvent(database: String, name: String) extends TableEvent
+
+/**
+ * Event fired before a table is dropped.
+ */
+case class DropTablePreEvent(database: String, name: String) extends TableEvent
+
+/**
+ * Event fired after a table has been dropped.
+ */
+case class DropTableEvent(database: String, name: String) extends TableEvent
+
+/**
+ * Event fired before a table is renamed.
+ */
+case class RenameTablePreEvent(
+    database: String,
+    name: String,
+    newName: String)
+  extends TableEvent
+
+/**
+ * Event fired after a table has been renamed.
+ */
+case class RenameTableEvent(
+    database: String,
+    name: String,
+    newName: String)
+  extends TableEvent
+
+/**
+ * Event fired when a function is created, dropped or renamed.
+ */
+trait FunctionEvent extends DatabaseEvent {
+  /**
+   * Name of the function that was touched.
+   */
+  val name: String
+}
+
+/**
+ * Event fired before a function is created.
+ */
+case class CreateFunctionPreEvent(database: String, name: String) extends FunctionEvent
+
+/**
+ * Event fired after a function has been created.
+ */
+case class CreateFunctionEvent(database: String, name: String) extends FunctionEvent
+
+/**
+ * Event fired before a function is dropped.
+ */
+case class DropFunctionPreEvent(database: String, name: String) extends FunctionEvent
+
+/**
+ * Event fired after a function has been dropped.
+ */
+case class DropFunctionEvent(database: String, name: String) extends FunctionEvent
+
+/**
+ * Event fired before a function is renamed.
+ */
+case class RenameFunctionPreEvent(
+    database: String,
+    name: String,
+    newName: String)
+  extends FunctionEvent
+
+/**
+ * Event fired after a function has been renamed.
+ */
+case class RenameFunctionEvent(
+    database: String,
+    name: String,
+    newName: String)
+  extends FunctionEvent

http://git-wip-us.apache.org/repos/asf/spark/blob/e2b3d236/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogEventSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogEventSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogEventSuite.scala
new file mode 100644
index 0000000..2539ea6
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogEventSuite.scala
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.catalog
+
+import java.net.URI
+import java.nio.file.{Files, Path}
+
+import scala.collection.mutable
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Test Suite for external catalog events
+ */
+class ExternalCatalogEventSuite extends SparkFunSuite {
+
+  protected def newCatalog: ExternalCatalog = new InMemoryCatalog()
+
+  private def testWithCatalog(
+      name: String)(
+      f: (ExternalCatalog, Seq[ExternalCatalogEvent] => Unit) => Unit): Unit = test(name) {
+    val catalog = newCatalog
+    val recorder = mutable.Buffer.empty[ExternalCatalogEvent]
+    catalog.addListener(new ExternalCatalogEventListener {
+      override def onEvent(event: ExternalCatalogEvent): Unit = {
+        recorder += event
+      }
+    })
+    f(catalog, (expected: Seq[ExternalCatalogEvent]) => {
+      val actual = recorder.clone()
+      recorder.clear()
+      assert(expected === actual)
+    })
+  }
+
+  private def createDbDefinition(uri: URI): CatalogDatabase = {
+    CatalogDatabase(name = "db5", description = "", locationUri = uri, Map.empty)
+  }
+
+  private def createDbDefinition(): CatalogDatabase = {
+    createDbDefinition(preparePath(Files.createTempDirectory("db_")))
+  }
+
+  private def preparePath(path: Path): URI = path.normalize().toUri
+
+  testWithCatalog("database") { (catalog, checkEvents) =>
+    // CREATE
+    val dbDefinition = createDbDefinition()
+
+    catalog.createDatabase(dbDefinition, ignoreIfExists = false)
+    checkEvents(CreateDatabasePreEvent("db5") :: CreateDatabaseEvent("db5") :: Nil)
+
+    catalog.createDatabase(dbDefinition, ignoreIfExists = true)
+    checkEvents(CreateDatabasePreEvent("db5") :: CreateDatabaseEvent("db5") :: Nil)
+
+    intercept[AnalysisException] {
+      catalog.createDatabase(dbDefinition, ignoreIfExists = false)
+    }
+    checkEvents(CreateDatabasePreEvent("db5") :: Nil)
+
+    // DROP
+    intercept[AnalysisException] {
+      catalog.dropDatabase("db4", ignoreIfNotExists = false, cascade = false)
+    }
+    checkEvents(DropDatabasePreEvent("db4") :: Nil)
+
+    catalog.dropDatabase("db5", ignoreIfNotExists = false, cascade = false)
+    checkEvents(DropDatabasePreEvent("db5") :: DropDatabaseEvent("db5") :: Nil)
+
+    catalog.dropDatabase("db4", ignoreIfNotExists = true, cascade = false)
+    checkEvents(DropDatabasePreEvent("db4") :: DropDatabaseEvent("db4") :: Nil)
+  }
+
+  testWithCatalog("table") { (catalog, checkEvents) =>
+    val path1 = Files.createTempDirectory("db_")
+    val path2 = Files.createTempDirectory(path1, "tbl_")
+    val uri1 = preparePath(path1)
+    val uri2 = preparePath(path2)
+
+    // CREATE
+    val dbDefinition = createDbDefinition(uri1)
+
+    val storage = CatalogStorageFormat.empty.copy(
+      locationUri = Option(uri2))
+    val tableDefinition = CatalogTable(
+      identifier = TableIdentifier("tbl1", Some("db5")),
+      tableType = CatalogTableType.MANAGED,
+      storage = storage,
+      schema = new StructType().add("id", "long"))
+
+    catalog.createDatabase(dbDefinition, ignoreIfExists = false)
+    checkEvents(CreateDatabasePreEvent("db5") :: CreateDatabaseEvent("db5") :: Nil)
+
+    catalog.createTable(tableDefinition, ignoreIfExists = false)
+    checkEvents(CreateTablePreEvent("db5", "tbl1") :: CreateTableEvent("db5", "tbl1") :: Nil)
+
+    catalog.createTable(tableDefinition, ignoreIfExists = true)
+    checkEvents(CreateTablePreEvent("db5", "tbl1") :: CreateTableEvent("db5", "tbl1") :: Nil)
+
+    intercept[AnalysisException] {
+      catalog.createTable(tableDefinition, ignoreIfExists = false)
+    }
+    checkEvents(CreateTablePreEvent("db5", "tbl1") :: Nil)
+
+    // RENAME
+    catalog.renameTable("db5", "tbl1", "tbl2")
+    checkEvents(
+      RenameTablePreEvent("db5", "tbl1", "tbl2") ::
+      RenameTableEvent("db5", "tbl1", "tbl2") :: Nil)
+
+    intercept[AnalysisException] {
+      catalog.renameTable("db5", "tbl1", "tbl2")
+    }
+    checkEvents(RenameTablePreEvent("db5", "tbl1", "tbl2") :: Nil)
+
+    // DROP
+    intercept[AnalysisException] {
+      catalog.dropTable("db5", "tbl1", ignoreIfNotExists = false, purge = true)
+    }
+    checkEvents(DropTablePreEvent("db5", "tbl1") :: Nil)
+
+    catalog.dropTable("db5", "tbl2", ignoreIfNotExists = false, purge = true)
+    checkEvents(DropTablePreEvent("db5", "tbl2") :: DropTableEvent("db5", "tbl2") :: Nil)
+
+    catalog.dropTable("db5", "tbl2", ignoreIfNotExists = true, purge = true)
+    checkEvents(DropTablePreEvent("db5", "tbl2") :: DropTableEvent("db5", "tbl2") :: Nil)
+  }
+
+  testWithCatalog("function") { (catalog, checkEvents) =>
+    // CREATE
+    val dbDefinition = createDbDefinition()
+
+    val functionDefinition = CatalogFunction(
+      identifier = FunctionIdentifier("fn7", Some("db5")),
+      className = "",
+      resources = Seq.empty)
+
+    val newIdentifier = functionDefinition.identifier.copy(funcName = "fn4")
+    val renamedFunctionDefinition = functionDefinition.copy(identifier = newIdentifier)
+
+    catalog.createDatabase(dbDefinition, ignoreIfExists = false)
+    checkEvents(CreateDatabasePreEvent("db5") :: CreateDatabaseEvent("db5") :: Nil)
+
+    catalog.createFunction("db5", functionDefinition)
+    checkEvents(CreateFunctionPreEvent("db5", "fn7") :: CreateFunctionEvent("db5", "fn7") :: Nil)
+
+    intercept[AnalysisException] {
+      catalog.createFunction("db5", functionDefinition)
+    }
+    checkEvents(CreateFunctionPreEvent("db5", "fn7") :: Nil)
+
+    // RENAME
+    catalog.renameFunction("db5", "fn7", "fn4")
+    checkEvents(
+      RenameFunctionPreEvent("db5", "fn7", "fn4") ::
+      RenameFunctionEvent("db5", "fn7", "fn4") :: Nil)
+    intercept[AnalysisException] {
+      catalog.renameFunction("db5", "fn7", "fn4")
+    }
+    checkEvents(RenameFunctionPreEvent("db5", "fn7", "fn4") :: Nil)
+
+    // DROP
+    intercept[AnalysisException] {
+      catalog.dropFunction("db5", "fn7")
+    }
+    checkEvents(DropFunctionPreEvent("db5", "fn7") :: Nil)
+
+    catalog.dropFunction("db5", "fn4")
+    checkEvents(DropFunctionPreEvent("db5", "fn4") :: DropFunctionEvent("db5", "fn4") :: Nil)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/e2b3d236/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
index d06dbaa..f834569 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
@@ -109,6 +109,13 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
     }
   }
 
+  // Make sure we propagate external catalog events to the spark listener bus
+  externalCatalog.addListener(new ExternalCatalogEventListener {
+    override def onEvent(event: ExternalCatalogEvent): Unit = {
+      sparkContext.listenerBus.post(event)
+    }
+  })
+
   /**
    * A manager for global temporary views.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/e2b3d236/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 8b0fdf4..71e33c4 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -141,13 +141,13 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
   // Databases
   // --------------------------------------------------------------------------
 
-  override def createDatabase(
+  override protected def doCreateDatabase(
       dbDefinition: CatalogDatabase,
       ignoreIfExists: Boolean): Unit = withClient {
     client.createDatabase(dbDefinition, ignoreIfExists)
   }
 
-  override def dropDatabase(
+  override protected def doDropDatabase(
       db: String,
       ignoreIfNotExists: Boolean,
       cascade: Boolean): Unit = withClient {
@@ -194,7 +194,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
   // Tables
   // --------------------------------------------------------------------------
 
-  override def createTable(
+  override protected def doCreateTable(
       tableDefinition: CatalogTable,
       ignoreIfExists: Boolean): Unit = withClient {
     assert(tableDefinition.identifier.database.isDefined)
@@ -456,7 +456,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
     }
   }
 
-  override def dropTable(
+  override protected def doDropTable(
       db: String,
       table: String,
       ignoreIfNotExists: Boolean,
@@ -465,7 +465,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
     client.dropTable(db, table, ignoreIfNotExists, purge)
   }
 
-  override def renameTable(db: String, oldName: String, newName: String): Unit = withClient {
+  override protected def doRenameTable(
+      db: String,
+      oldName: String,
+      newName: String): Unit = withClient {
     val rawTable = getRawTable(db, oldName)
 
     // Note that Hive serde tables don't use path option in storage properties to store the value
@@ -1056,7 +1059,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
   // Functions
   // --------------------------------------------------------------------------
 
-  override def createFunction(
+  override protected def doCreateFunction(
       db: String,
       funcDefinition: CatalogFunction): Unit = withClient {
     requireDbExists(db)
@@ -1069,12 +1072,15 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
     client.createFunction(db, funcDefinition.copy(identifier = functionIdentifier))
   }
 
-  override def dropFunction(db: String, name: String): Unit = withClient {
+  override protected def doDropFunction(db: String, name: String): Unit = withClient {
     requireFunctionExists(db, name)
     client.dropFunction(db, name)
   }
 
-  override def renameFunction(db: String, oldName: String, newName: String): Unit = withClient {
+  override protected def doRenameFunction(
+      db: String,
+      oldName: String,
+      newName: String): Unit = withClient {
     requireFunctionExists(db, oldName)
     requireFunctionNotExists(db, newName)
     client.renameFunction(db, oldName, newName)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org