You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/10/31 04:25:20 UTC

[GitHub] bowenli86 closed pull request #6969: [FLINK-10556][Table API & SQL]Add APIs to ExternalCatalog, CrudExternalCatalog and InMemoryCrudExternalCatalog for views and UDFs

bowenli86 closed pull request #6969: [FLINK-10556][Table API & SQL]Add APIs to ExternalCatalog, CrudExternalCatalog and InMemoryCrudExternalCatalog for views and UDFs
URL: https://github.com/apache/flink/pull/6969
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
index 7fc7de50e07..0f1c80df544 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
@@ -107,6 +107,70 @@ case class CatalogAlreadyExistException(
   def this(catalog: String) = this(catalog, null)
 }
 
+/**
+  * Exception for operation on a nonexistent view
+  *
+  * @param catalog catalog name
+  * @param view view name
+  * @param cause the cause
+  */
+case class ViewNotExistException(
+      catalog: String,
+      view: String,
+      cause: Throwable)
+  extends RuntimeException(s"View $view does not exist.", cause) {
+
+  def this(catalog: String, view: String) = this(catalog, view, null)
+}
+
+/**
+  * Exception for adding an already existent view
+  *
+  * @param catalog catalog name
+  * @param view view name
+  * @param cause the cause
+  */
+case class ViewAlreadyExistException(
+      catalog: String,
+      view: String,
+      cause: Throwable)
+  extends RuntimeException(s"View $view already exists.", cause) {
+
+  def this(catalog: String, view: String) = this(catalog, view, null)
+}
+
+/**
+  * Exception for operation on a nonexistent function
+  *
+  * @param catalog catalog name
+  * @param function function name
+  * @param cause the cause
+  */
+case class FunctionNotExistException(
+      catalog: String,
+      function: String,
+      cause: Throwable)
+  extends RuntimeException(s"Function $function does not exist.", cause) {
+
+  def this(catalog: String, function: String) = this(catalog, function, null)
+}
+
+/**
+  * Exception for adding an already existent function
+  *
+  * @param catalog catalog name
+  * @param function function name
+  * @param cause the cause
+  */
+case class FunctionAlreadyExistException(
+      catalog: String,
+      function: String,
+      cause: Throwable)
+  extends RuntimeException(s"Function $function already exists.", cause) {
+
+  def this(catalog: String, function: String) = this(catalog, function, null)
+}
+
 /**
   * Exception for not finding a [[TableFactory]] for the given properties.
   *
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala
index 4db9497a712..02bdbed21fc 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala
@@ -19,9 +19,11 @@
 package org.apache.flink.table.catalog
 
 import org.apache.flink.table.api._
+import org.apache.flink.table.functions.UserDefinedFunction
 
 /**
-  * The CrudExternalCatalog provides methods to create, drop, and alter (sub-)catalogs or tables.
+  * The CrudExternalCatalog provides methods to create, drop, and alter (sub-)catalogs, tables,
+  * views and UDFs.
   */
 trait CrudExternalCatalog extends ExternalCatalog {
 
@@ -103,4 +105,86 @@ trait CrudExternalCatalog extends ExternalCatalog {
   @throws[CatalogNotExistException]
   def alterSubCatalog(name: String, catalog: ExternalCatalog, ignoreIfNotExists: Boolean): Unit
 
+  /**
+    * Adds a view to this catalog.
+    *
+    * @param viewName      The name of the view to add.
+    * @param view          The view to add.
+    * @param ignoreIfExists Flag to specify behavior if a view with the given name already exists:
+    *                       if set to false, throw an exception,
+    *                       if set to true, nothing happens.
+    * @throws ViewAlreadyExistException thrown if view already exists and ignoreIfExists is false
+    */
+  @throws[ViewAlreadyExistException]
+  def createView(viewName: String, view: String, ignoreIfExists: Boolean): Unit
+
+  /**
+    * Deletes a view from this catalog.
+    *
+    * @param viewName         Name of the view to delete.
+    * @param ignoreIfNotExists Flag to specify behavior if the view does not exist:
+    *                          if set to false, throw an exception,
+    *                          if set to true, nothing happens.
+    * @throws ViewNotExistException    thrown if the view does not exist in the catalog
+    */
+  @throws[ViewNotExistException]
+  def dropView(viewName: String, ignoreIfNotExists: Boolean): Unit
+
+  /**
+    * Modifies an existing view of this catalog.
+    *
+    * @param viewName         The name of the view to modify.
+    * @param view             The new view which replaces the existing table.
+    * @param ignoreIfNotExists Flag to specify behavior if the view does not exist:
+    *                          if set to false, throw an exception,
+    *                          if set to true, nothing happens.
+    * @throws ViewNotExistException   thrown if the view does not exist in the catalog
+    */
+  @throws[ViewNotExistException]
+  def alterView(viewName: String, view: String, ignoreIfNotExists: Boolean): Unit
+
+  /**
+    * Adds a UDF to this catalog.
+    *
+    * @param functionName      The name of the function to add.
+    * @param function          The function to add.
+    * @param ignoreIfExists Flag to specify behavior if function with the given name already exists:
+    *                       if set to false, throw an exception,
+    *                       if set to true, nothing happens.
+    * @throws FunctionAlreadyExistException thrown if function already exists and ignoreIfExists
+    *                                       is false
+    */
+  @throws[FunctionAlreadyExistException]
+  def createFunction(
+    functionName: String,
+    function: UserDefinedFunction,
+    ignoreIfExists: Boolean): Unit
+
+  /**
+    * Deletes a UDF from this catalog.
+    *
+    * @param functionName         Name of the function to delete.
+    * @param ignoreIfNotExists Flag to specify behavior if the function does not exist:
+    *                          if set to false, throw an exception,
+    *                          if set to true, nothing happens.
+    * @throws FunctionNotExistException    thrown if the function does not exist in the catalog
+    */
+  @throws[FunctionNotExistException]
+  def dropFunction(functionName: String, ignoreIfNotExists: Boolean): Unit
+
+  /**
+    * Modifies an existing UDF of this catalog.
+    *
+    * @param functionName         The name of the function to modify.
+    * @param function             The new function which replaces the existing table.
+    * @param ignoreIfNotExists Flag to specify behavior if the function does not exist:
+    *                          if set to false, throw an exception,
+    *                          if set to true, nothing happens.
+    * @throws FunctionNotExistException   thrown if the function does not exist in the catalog
+    */
+  @throws[FunctionNotExistException]
+  def alterFunction(
+    functionName: String,
+    function: UserDefinedFunction,
+    ignoreIfNotExists: Boolean): Unit
 }
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala
index 5f4511b1efd..00567b0778a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala
@@ -21,13 +21,14 @@ package org.apache.flink.table.catalog
 import java.util.{List => JList}
 
 import org.apache.flink.table.api._
+import org.apache.flink.table.functions.UserDefinedFunction
 
 /**
   * An [[ExternalCatalog]] is the connector between an external database catalog and Flink's
   * Table API.
   *
-  * It provides information about catalogs, databases and tables such as names, schema, statistics,
-  * and access information.
+  * It provides information about catalogs, databases, tables, views, UDFs, such as names, schema,
+  * statistics, and access information.
   */
 trait ExternalCatalog {
 
@@ -63,4 +64,37 @@ trait ExternalCatalog {
     */
   def listSubCatalogs(): JList[String]
 
+  /**
+    * Gets a view's definition, which is a string such as "select xxx from yyy", from this catalog.
+    * This might be an oversimplification, but we should be okay for now. In the future,
+    * we may have a class representation such as ExternalCatalogView.
+    *
+    * @param viewName The view's name
+    * @return The view's definition, which is a string such as "select xxx from yyy".
+    */
+  @throws[ViewNotExistException]
+  def getView(viewName: String): String
+
+  /**
+    * Gets the names of all views registered in this catalog.
+    *
+    * @return The list of names of all registered views.
+    */
+  def listViews(): JList[String]
+
+  /**
+    * Gets a UDF from this catalog.
+    *
+    * @param functionName The function's name
+    * @return The requested UDF
+    */
+  @throws[FunctionNotExistException]
+  def getFunction(functionName: String): UserDefinedFunction
+
+  /**
+    * Gets the names of all UDFs registered in this catalog.
+    *
+    * @return The list of names of all registered UDFs.
+    */
+  def listFunctions(): JList[String]
 }
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/InMemoryExternalCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/InMemoryExternalCatalog.scala
index ee30a8ed42a..bbd5fade480 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/InMemoryExternalCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/InMemoryExternalCatalog.scala
@@ -20,10 +20,11 @@ package org.apache.flink.table.catalog
 
 import java.util.{List => JList}
 
-import org.apache.flink.table.api.{CatalogAlreadyExistException, CatalogNotExistException, TableAlreadyExistException, TableNotExistException}
+import org.apache.flink.table.api._
+import org.apache.flink.table.functions.UserDefinedFunction
 
-import scala.collection.mutable
-import scala.collection.JavaConverters._
+import _root_.scala.collection.mutable
+import _root_.scala.collection.JavaConverters._
 
 /**
   * This class is an in-memory implementation of [[ExternalCatalog]].
@@ -36,6 +37,8 @@ class InMemoryExternalCatalog(name: String) extends CrudExternalCatalog {
 
   private val databases = new mutable.HashMap[String, ExternalCatalog]
   private val tables = new mutable.HashMap[String, ExternalCatalogTable]
+  private val views = new mutable.HashMap[String, String]
+  private val functions = new mutable.HashMap[String, UserDefinedFunction]
 
   @throws[TableAlreadyExistException]
   override def createTable(
@@ -98,6 +101,77 @@ class InMemoryExternalCatalog(name: String) extends CrudExternalCatalog {
     }
   }
 
+  @throws[ViewAlreadyExistException]
+  override def createView(
+    viewName: String,
+    view: String,
+    ignoreIfExists: Boolean): Unit = synchronized {
+
+    views.get(viewName) match {
+      case Some(_) if !ignoreIfExists => throw new ViewAlreadyExistException(name, viewName)
+      case _ => views.put(viewName, view)
+    }
+  }
+
+  @throws[ViewNotExistException]
+  override def dropView(
+    viewName: String,
+    ignoreIfNotExists: Boolean): Unit = synchronized {
+
+    if (views.remove(viewName).isEmpty && !ignoreIfNotExists) {
+      throw new ViewNotExistException(name, viewName)
+    }
+  }
+
+  @throws[ViewNotExistException]
+  override def alterView(
+    viewName: String,
+    view: String,
+    ignoreIfNotExists: Boolean): Unit = synchronized {
+
+    if (views.contains(viewName)) {
+      views.put(viewName, view)
+    } else if (!ignoreIfNotExists) {
+      throw new ViewNotExistException(name, viewName)
+    }
+  }
+
+  @throws[FunctionAlreadyExistException]
+  override def createFunction(
+    functionName: String,
+    function: UserDefinedFunction,
+    ignoreIfExists: Boolean): Unit = synchronized {
+
+    functions.get(functionName) match {
+      case Some(_) if !ignoreIfExists => throw new FunctionAlreadyExistException(name, functionName)
+      case _ => functions.put(functionName, function)
+    }
+  }
+
+  @throws[FunctionNotExistException]
+  override def dropFunction(
+    functionName: String,
+    ignoreIfNotExists: Boolean): Unit = synchronized {
+
+    if (functions.remove(functionName).isEmpty && !ignoreIfNotExists) {
+      throw new FunctionNotExistException(name, functionName)
+    }
+  }
+
+  @throws[FunctionNotExistException]
+  override def alterFunction(
+    functionName: String,
+    function: UserDefinedFunction,
+    ignoreIfNotExists: Boolean): Unit = synchronized {
+
+    if (functions.contains(functionName)) {
+      functions.put(functionName, function)
+    } else if (!ignoreIfNotExists) {
+      throw new FunctionNotExistException(name, functionName)
+    }
+  }
+
+  @throws[TableNotExistException]
   override def getTable(tableName: String): ExternalCatalogTable = synchronized {
     tables.get(tableName) match {
       case Some(t) => t
@@ -120,4 +194,28 @@ class InMemoryExternalCatalog(name: String) extends CrudExternalCatalog {
   override def listSubCatalogs(): JList[String] = synchronized {
     databases.keys.toList.asJava
   }
+
+  @throws[ViewNotExistException]
+  override def getView(viewName: String): String = synchronized {
+    views.get(viewName) match {
+      case Some(v) => v
+      case _ => throw ViewNotExistException(name, viewName, null)
+    }
+  }
+
+  override def listViews(): JList[String] = synchronized {
+    views.keys.toList.asJava
+  }
+
+  @throws[FunctionNotExistException]
+  override def getFunction(functionName: String): UserDefinedFunction = synchronized {
+    functions.get(functionName) match {
+      case Some(f) => f
+      case _ => throw new FunctionNotExistException(name, functionName, null)
+    }
+  }
+
+  override def listFunctions(): JList[String] = synchronized {
+    functions.keys.toList.asJava
+  }
 }
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/InMemoryExternalCatalogTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/InMemoryExternalCatalogTest.scala
index 5238bfe0cb9..b0a084cbb97 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/InMemoryExternalCatalogTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/InMemoryExternalCatalogTest.scala
@@ -21,6 +21,7 @@ package org.apache.flink.table.catalog
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.table.api._
 import org.apache.flink.table.descriptors.{ConnectorDescriptor, Schema}
+import org.apache.flink.table.functions.UserDefinedFunction
 import org.junit.Assert._
 import org.junit.{Before, Test}
 
@@ -35,6 +36,8 @@ class InMemoryExternalCatalogTest {
     catalog = new InMemoryExternalCatalog(databaseName)
   }
 
+  // ------ Table ------
+
   @Test
   def testCreateTable(): Unit = {
     assertTrue(catalog.listTables().isEmpty)
@@ -96,6 +99,8 @@ class InMemoryExternalCatalogTest {
     catalog.dropTable("nonexisted", ignoreIfNotExists = false)
   }
 
+  // ------ SubCatalog ------
+
   @Test(expected = classOf[CatalogNotExistException])
   def testGetNotExistDatabase(): Unit = {
     catalog.getSubCatalog("notexistedDb")
@@ -133,6 +138,136 @@ class InMemoryExternalCatalogTest {
     assertEquals("table", tables.get(0))
   }
 
+  // ------ View ------
+
+  @Test
+  def testCreateView(): Unit = {
+    assertTrue(catalog.listViews().isEmpty)
+    catalog.createView("v1", createViewInstance(), ignoreIfExists = false)
+    val views = catalog.listViews()
+    assertEquals(1, views.size())
+    assertEquals("v1", views.get(0))
+  }
+
+  @Test(expected = classOf[ViewAlreadyExistException])
+  def testCreateExistedView(): Unit = {
+    val viewName = "v1"
+    catalog.createView(viewName, createViewInstance(), ignoreIfExists = false)
+    catalog.createView(viewName, createViewInstance(), ignoreIfExists = false)
+  }
+
+  @Test
+  def testGetView(): Unit = {
+    val originView = createViewInstance()
+    catalog.createView("v1", originView, ignoreIfExists = false)
+    assertEquals(catalog.getView("v1"), originView)
+  }
+
+  @Test(expected = classOf[ViewNotExistException])
+  def testGetNotExistView(): Unit = {
+    catalog.getView("nonexisted")
+  }
+
+  @Test
+  def testAlterView(): Unit = {
+    val viewName = "v1"
+    val view = createViewInstance()
+    catalog.createView(viewName, view, ignoreIfExists = false)
+    assertEquals(catalog.getView(viewName), view)
+
+    val newView = createNewViewInstance()
+    catalog.alterView(viewName, newView, ignoreIfNotExists = false)
+    val currentView = catalog.getView(viewName)
+    // validate the view is really replaced after alter view
+    assertNotEquals(view, currentView)
+    assertEquals(newView, currentView)
+  }
+
+  @Test(expected = classOf[ViewNotExistException])
+  def testAlterNotExistView(): Unit = {
+    catalog.alterView("nonexisted", createViewInstance(), ignoreIfNotExists = false)
+  }
+
+  @Test
+  def testDropView(): Unit = {
+    val viewName = "v1"
+    catalog.createView(viewName, createViewInstance(), ignoreIfExists = false)
+    assertTrue(catalog.listViews().contains(viewName))
+    catalog.dropView(viewName, ignoreIfNotExists = false)
+    assertFalse(catalog.listViews().contains(viewName))
+  }
+
+  @Test(expected = classOf[ViewNotExistException])
+  def testDropNotExistView(): Unit = {
+    catalog.dropView("nonexisted", ignoreIfNotExists = false)
+  }
+
+  // ------ UDF ------
+
+  @Test
+  def testCreateFunction(): Unit = {
+    assertTrue(catalog.listFunctions().isEmpty)
+    catalog.createFunction("f1", createFunctionInstance(), ignoreIfExists = false)
+    val functions = catalog.listFunctions()
+    assertEquals(1, functions.size())
+    assertEquals("f1", functions.get(0))
+  }
+
+  @Test(expected = classOf[FunctionAlreadyExistException])
+  def testCreateExistedFunction(): Unit = {
+    val functionName = "f1"
+    catalog.createFunction(functionName, createFunctionInstance(), ignoreIfExists = false)
+    catalog.createFunction(functionName, createFunctionInstance(), ignoreIfExists = false)
+  }
+
+  @Test
+  def testGetFunction(): Unit = {
+    val originFunction = createFunctionInstance()
+    catalog.createFunction("f1", originFunction, ignoreIfExists = false)
+    assertEquals(catalog.getFunction("f1"), originFunction)
+  }
+
+  @Test(expected = classOf[FunctionNotExistException])
+  def testGetNotExistFunction(): Unit = {
+    catalog.getFunction("nonexisted")
+  }
+
+  @Test
+  def testAlterFunction(): Unit = {
+    val functionName = "f1"
+    val function = createFunctionInstance()
+    catalog.createFunction(functionName, function, ignoreIfExists = false)
+    assertEquals(catalog.getFunction(functionName), function)
+
+    val newFunction = createNewFunctionInstance()
+    catalog.alterFunction(functionName, newFunction, ignoreIfNotExists = false)
+    val currentFunction = catalog.getFunction(functionName)
+    // validate the function is really replaced after alter view
+    assertNotEquals(function, currentFunction)
+    assertEquals(newFunction, currentFunction)
+  }
+
+  @Test(expected = classOf[FunctionNotExistException])
+  def testAlterNotExistFunction(): Unit = {
+    catalog.alterFunction("nonexisted", createFunctionInstance(), ignoreIfNotExists = false)
+  }
+
+  @Test
+  def testDropFunction(): Unit = {
+    val functionName = "f1"
+    catalog.createFunction(functionName, createFunctionInstance(), ignoreIfExists = false)
+    assertTrue(catalog.listFunctions().contains(functionName))
+    catalog.dropFunction(functionName, ignoreIfNotExists = false)
+    assertFalse(catalog.listFunctions().contains(functionName))
+  }
+
+  @Test(expected = classOf[FunctionNotExistException])
+  def testDropNotExistFunction(): Unit = {
+    catalog.dropFunction("nonexisted", ignoreIfNotExists = false)
+  }
+
+  // ------ Utils ------
+
   private def createTableInstance(): ExternalCatalogTable = {
     val connDesc = new TestConnectorDesc
     val schemaDesc = new Schema()
@@ -156,9 +291,33 @@ class InMemoryExternalCatalogTest {
       .asTableSource()
   }
 
+  private def createViewInstance(): String = {
+    "select a from b"
+  }
+
+  private def createNewViewInstance(): String = {
+    "select c from d"
+  }
+
+  private def createFunctionInstance(): UserDefinedFunction = {
+    new TestFunction1
+  }
+
+  private def createNewFunctionInstance(): UserDefinedFunction = {
+    new TestFunction2
+  }
+
   class TestConnectorDesc extends ConnectorDescriptor("test", 1, false) {
     override protected def toConnectorProperties: _root_.java.util.Map[String, String] = {
       _root_.java.util.Collections.emptyMap()
     }
   }
+
+  class TestFunction1 extends UserDefinedFunction {
+
+  }
+
+  class TestFunction2 extends UserDefinedFunction {
+
+  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services