You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/11/16 14:36:05 UTC

[GitHub] twalthr closed pull request #3569: [FLINK-6036] [table] Let catalog support partition

twalthr closed pull request #3569: [FLINK-6036] [table] Let catalog support partition
URL: https://github.com/apache/flink/pull/3569
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
index 760cf7588f4..1b07fd8998d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
@@ -18,7 +18,9 @@
 
 package org.apache.flink.table.api
 
+import com.google.common.base.Joiner
 import org.apache.flink.table.catalog.TableSourceConverter
+import org.apache.flink.table.catalog.ExternalCatalogTypes.PartitionSpec
 
 /**
   * Exception for all errors occurring during expression parsing.
@@ -74,6 +76,50 @@ object ValidationException {
   */
 case class UnresolvedException(msg: String) extends RuntimeException(msg)
 
+/**
+  * Exception for an operation on a nonexistent partition
+  *
+  * @param db            database name
+  * @param table         table name
+  * @param partitionSpec partition spec
+  * @param cause         the cause
+  */
+case class PartitionNotExistException(
+    db: String,
+    table: String,
+    partitionSpec: PartitionSpec,
+    cause: Throwable)
+    extends RuntimeException(
+      s"Partition [${Joiner.on(",").withKeyValueSeparator("=").join(partitionSpec)}] " +
+          s"does not exist in table $db.$table!", cause) {
+
+  def this(db: String, table: String, partitionSpec: PartitionSpec) =
+    this(db, table, partitionSpec, null)
+
+}
+
+/**
+  * Exception for adding an already existent partition
+  *
+  * @param db            database name
+  * @param table         table name
+  * @param partitionSpec partition spec
+  * @param cause         the cause
+  */
+case class PartitionAlreadyExistException(
+    db: String,
+    table: String,
+    partitionSpec: PartitionSpec,
+    cause: Throwable)
+    extends RuntimeException(
+      s"Partition [${Joiner.on(",").withKeyValueSeparator("=").join(partitionSpec)}] " +
+          s"already exists in table $db.$table!", cause) {
+
+  def this(db: String, table: String, partitionSpec: PartitionSpec) =
+    this(db, table, partitionSpec, null)
+
+}
+
 /**
   * Exception for an operation on a nonexistent table
   *
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala
index fcefa45fcc5..78798ce0b72 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala
@@ -19,12 +19,81 @@
 package org.apache.flink.table.catalog
 
 import org.apache.flink.table.api._
+import org.apache.flink.table.catalog.ExternalCatalogTypes.PartitionSpec
 
 /**
   * The CrudExternalCatalog provides methods to create, drop, and alter databases or tables.
   */
 trait CrudExternalCatalog extends ExternalCatalog {
 
+  /**
+    * Adds a partition to the catalog.
+    *
+    * @param dbName         The name of the table's database.
+    * @param tableName      The name of the table.
+    * @param part           Description of the partition to add.
+    * @param ignoreIfExists Flag to specify behavior if a partition with the given spec
+    *                       already exists:
+    *                       if set to false, it throws a PartitionAlreadyExistException,
+    *                       if set to true, nothing happens.
+    * @throws DatabaseNotExistException      thrown if the database does not exist in the catalog.
+    * @throws TableNotExistException         thrown if the table does not exist in the catalog.
+    * @throws PartitionAlreadyExistException thrown if the partition already exists and
+    *                                        ignoreIfExists is false
+    */
+  @throws[DatabaseNotExistException]
+  @throws[TableNotExistException]
+  @throws[PartitionAlreadyExistException]
+  def createPartition(
+      dbName: String,
+      tableName: String,
+      part: ExternalCatalogTablePartition,
+      ignoreIfExists: Boolean): Unit
+
+  /**
+    * Deletes partition from a database of the catalog.
+    *
+    * @param dbName            The name of the table's database.
+    * @param tableName         The name of the table.
+    * @param partSpec          Description of the partition to add.
+    * @param ignoreIfNotExists Flag to specify behavior if the partition does not exist:
+    *                          if set to false, throw an exception,
+    *                          if set to true, nothing happens.
+    * @throws DatabaseNotExistException  thrown if the database does not exist in the catalog.
+    * @throws TableNotExistException     thrown if the table does not exist in the catalog.
+    * @throws PartitionNotExistException thrown if the partition does not exist in the catalog.
+    */
+  @throws[DatabaseNotExistException]
+  @throws[TableNotExistException]
+  @throws[PartitionNotExistException]
+  def dropPartition(
+      dbName: String,
+      tableName: String,
+      partSpec: PartitionSpec,
+      ignoreIfNotExists: Boolean): Unit
+
+  /**
+    * Modifies an existing partition in the catalog.
+    *
+    * @param dbName            The name of the table's database.
+    * @param tableName         The name of the table.
+    * @param part              New description of the partition to update.
+    * @param ignoreIfNotExists Flag to specify behavior if the partition does not exist:
+    *                          if set to false, throw an exception,
+    *                          if set to true, nothing happens.
+    * @throws DatabaseNotExistException  thrown if the database does not exist in the catalog.
+    * @throws TableNotExistException     thrown if the table does not exist in the catalog.
+    * @throws PartitionNotExistException thrown if the partition does not exist in the catalog.
+    */
+  @throws[DatabaseNotExistException]
+  @throws[TableNotExistException]
+  @throws[PartitionNotExistException]
+  def alterPartition(
+      dbName: String,
+      tableName: String,
+      part: ExternalCatalogTablePartition,
+      ignoreIfNotExists: Boolean): Unit
+
   /**
     * Adds a table to the catalog.
     *
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala
index 00a35e41aee..5810f73d2ce 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalog.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.catalog
 
 import java.util.{List => JList}
 
+import org.apache.flink.table.catalog.ExternalCatalogTypes.PartitionSpec
 import org.apache.flink.table.api._
 
 /**
@@ -31,6 +32,38 @@ import org.apache.flink.table.api._
   */
 trait ExternalCatalog {
 
+  /**
+    * Gets a partition from the Catalog
+    *
+    * @param dbName    The name of the table's database.
+    * @param tableName The name of the table.
+    * @param partSpec  The partition specification.
+    * @throws DatabaseNotExistException  thrown if the database does not exist in the catalog.
+    * @throws TableNotExistException     thrown if the table does not exist in the catalog.
+    * @throws PartitionNotExistException thrown if the partition does not exist in the catalog.
+    * @return the requested partition
+    */
+  @throws[DatabaseNotExistException]
+  @throws[TableNotExistException]
+  @throws[PartitionNotExistException]
+  def getPartition(
+      dbName: String,
+      tableName: String,
+      partSpec: PartitionSpec): ExternalCatalogTablePartition
+
+  /**
+    * Gets the partition specification list of a table from external catalog
+    *
+    * @param dbName    database name
+    * @param tableName table name
+    * @throws DatabaseNotExistException if database does not exist in the catalog yet
+    * @throws TableNotExistException    if table does not exist in the catalog yet
+    * @return list of partition spec
+    */
+  @throws[DatabaseNotExistException]
+  @throws[TableNotExistException]
+  def listPartitions(dbName: String, tableName: String): JList[PartitionSpec]
+
   /**
     * Get a table from the catalog
     *
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
index 4fdab661458..b94ab08b23b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
@@ -18,10 +18,9 @@
 
 package org.apache.flink.table.catalog
 
-import java.util.{HashMap => JHashMap, Map => JMap}
+import java.util.{HashMap => JHashMap, Map => JMap, LinkedHashSet => JLinkedHashSet}
 import java.lang.{Long => JLong}
 
-import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.api.TableSchema
 import org.apache.flink.table.plan.stats.TableStats
 
@@ -44,6 +43,8 @@ case class ExternalCatalogTable(
     properties: JMap[String, String] = new JHashMap(),
     stats: TableStats = null,
     comment: String = null,
+    partitionColumnNames: JLinkedHashSet[String] = new JLinkedHashSet(),
+    isPartitioned: Boolean = false,
     createTime: JLong = System.currentTimeMillis,
     lastAccessTime: JLong = -1L)
 
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTablePartition.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTablePartition.scala
new file mode 100644
index 00000000000..d31f8c16566
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTablePartition.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog
+
+import java.util.{HashMap => JHashMap, Map => JMap, LinkedHashMap => JLinkedHashMap}
+
+import org.apache.flink.table.plan.stats.TablePartitionStats
+
+object ExternalCatalogTypes {
+
+  /**
+    * external table partition specification.
+    * Key is partition column name, value is partition column value.
+    */
+  type PartitionSpec = JLinkedHashMap[String, String]
+}
+
+/**
+  * Partition definition of an external Catalog table
+  *
+  * @param partitionSpec partition specification
+  * @param properties    partition properties
+  * @param stats         partition statistics
+  */
+case class ExternalCatalogTablePartition(
+    partitionSpec: ExternalCatalogTypes.PartitionSpec,
+    properties: JMap[String, String] = new JHashMap(),
+    stats: TablePartitionStats = null)
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/InMemoryExternalCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/InMemoryExternalCatalog.scala
index 6a619161682..229d3f47e30 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/InMemoryExternalCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/InMemoryExternalCatalog.scala
@@ -18,11 +18,14 @@
 
 package org.apache.flink.table.catalog
 
-import org.apache.flink.table.api.{DatabaseAlreadyExistException, DatabaseNotExistException, TableAlreadyExistException, TableNotExistException}
-import java.util.{List => JList}
+import org.apache.flink.table.api._
+import _root_.java.util.{List => JList}
 
-import scala.collection.mutable.HashMap
-import scala.collection.JavaConverters._
+import org.apache.commons.collections.CollectionUtils
+import org.apache.flink.table.catalog.ExternalCatalogTypes.PartitionSpec
+
+import _root_.scala.collection.mutable
+import _root_.scala.collection.JavaConverters._
 
 /**
   * This class is an in-memory implementation of [[ExternalCatalog]].
@@ -31,7 +34,84 @@ import scala.collection.JavaConverters._
   */
 class InMemoryExternalCatalog extends CrudExternalCatalog {
 
-  private val databases = new HashMap[String, Database]
+  private val databases = new mutable.HashMap[String, DatabaseDesc]
+
+  @throws[DatabaseNotExistException]
+  @throws[TableNotExistException]
+  @throws[PartitionAlreadyExistException]
+  override def createPartition(
+      dbName: String,
+      tableName: String,
+      part: ExternalCatalogTablePartition,
+      ignoreIfExists: Boolean): Unit = synchronized {
+    val newPartSpec = part.partitionSpec
+    val partitionedTable = getPartitionedTable(dbName, tableName)
+    checkPartitionSpec(newPartSpec, partitionedTable.table)
+    if (partitionedTable.partitions.contains(newPartSpec)) {
+      if (!ignoreIfExists) {
+        throw new PartitionAlreadyExistException(dbName, tableName, newPartSpec)
+      }
+    } else {
+      partitionedTable.partitions.put(newPartSpec, part)
+    }
+  }
+
+  @throws[DatabaseNotExistException]
+  @throws[TableNotExistException]
+  @throws[PartitionNotExistException]
+  override def dropPartition(
+      dbName: String,
+      tableName: String,
+      partSpec: PartitionSpec,
+      ignoreIfNotExists: Boolean): Unit = synchronized {
+    val partitionedTable = getPartitionedTable(dbName, tableName)
+    checkPartitionSpec(partSpec, partitionedTable.table)
+    if (partitionedTable.partitions.remove(partSpec).isEmpty && !ignoreIfNotExists) {
+      throw new PartitionNotExistException(dbName, tableName, partSpec)
+    }
+  }
+
+  @throws[DatabaseNotExistException]
+  @throws[TableNotExistException]
+  @throws[PartitionNotExistException]
+  override def alterPartition(
+      dbName: String,
+      tableName: String,
+      part: ExternalCatalogTablePartition,
+      ignoreIfNotExists: Boolean): Unit = synchronized {
+    val updatedPartSpec = part.partitionSpec
+    val partitionedTable = getPartitionedTable(dbName, tableName)
+    checkPartitionSpec(updatedPartSpec, partitionedTable.table)
+    if (partitionedTable.partitions.contains(updatedPartSpec)) {
+      partitionedTable.partitions.put(updatedPartSpec, part)
+    } else if (!ignoreIfNotExists) {
+      throw new PartitionNotExistException(dbName, tableName, updatedPartSpec)
+    }
+  }
+
+  @throws[DatabaseNotExistException]
+  @throws[TableNotExistException]
+  @throws[PartitionNotExistException]
+  override def getPartition(
+      dbName: String,
+      tableName: String,
+      partSpec: PartitionSpec): ExternalCatalogTablePartition = synchronized {
+    val partitionedTable = getPartitionedTable(dbName, tableName)
+    checkPartitionSpec(partSpec, partitionedTable.table)
+    partitionedTable.partitions.get(partSpec) match {
+      case Some(part) => part
+      case None =>
+        throw new PartitionNotExistException(dbName, tableName, partSpec)
+    }
+  }
+
+  @throws[DatabaseNotExistException]
+  @throws[TableNotExistException]
+  override def listPartitions(
+      dbName: String,
+      tableName: String): JList[PartitionSpec] = synchronized {
+      getPartitionedTable(dbName, tableName).partitions.keys.toList.asJava
+    }
 
   @throws[DatabaseNotExistException]
   @throws[TableAlreadyExistException]
@@ -46,7 +126,7 @@ class InMemoryExternalCatalog extends CrudExternalCatalog {
         throw new TableAlreadyExistException(dbName, tableName)
       }
     } else {
-      tables.put(tableName, table)
+      tables.put(tableName, new TableDesc(table))
     }
   }
 
@@ -71,7 +151,7 @@ class InMemoryExternalCatalog extends CrudExternalCatalog {
     val tables = getTables(dbName)
     val tableName = table.identifier.table
     if (tables.contains(tableName)) {
-      tables.put(tableName, table)
+      tables.put(tableName, new TableDesc(table))
     } else if (!ignoreIfNotExists) {
       throw new TableNotExistException(dbName, tableName)
     }
@@ -86,11 +166,8 @@ class InMemoryExternalCatalog extends CrudExternalCatalog {
   @throws[DatabaseNotExistException]
   @throws[TableNotExistException]
   override def getTable(dbName: String, tableName: String): ExternalCatalogTable = synchronized {
-    val tables = getTables(dbName)
-    tables.get(tableName) match {
-      case Some(table) => table
-      case None => throw new TableNotExistException(dbName, tableName)
-    }
+    val tableDesc = getTableDesc(dbName, tableName)
+    tableDesc.table
   }
 
   @throws[DatabaseAlreadyExistException]
@@ -103,7 +180,7 @@ class InMemoryExternalCatalog extends CrudExternalCatalog {
         throw new DatabaseAlreadyExistException(dbName)
       }
     } else {
-      databases.put(dbName, new Database(db))
+      databases.put(dbName, new DatabaseDesc(db))
     }
   }
 
@@ -142,14 +219,47 @@ class InMemoryExternalCatalog extends CrudExternalCatalog {
     }
   }
 
-  private def getTables(db: String): HashMap[String, ExternalCatalogTable] =
+  private def getTables(db: String): mutable.HashMap[String, TableDesc] =
     databases.get(db) match {
       case Some(database) => database.tables
       case None => throw new DatabaseNotExistException(db)
     }
 
-  private class Database(var db: ExternalCatalogDatabase) {
-    val tables = new HashMap[String, ExternalCatalogTable]
+  private def getTableDesc(
+      dbName: String,
+      tableName: String): TableDesc = {
+    val tables = getTables(dbName)
+    tables.get(tableName) match {
+      case Some(tableDesc) => tableDesc
+      case None =>
+        throw new TableNotExistException(dbName, tableName)
+    }
+  }
+
+  private def getPartitionedTable(
+      dbName: String,
+      tableName: String): TableDesc = {
+    val tableDesc = getTableDesc(dbName, tableName)
+    val table = tableDesc.table
+    if (table.isPartitioned) {
+      tableDesc
+    } else {
+      throw new UnsupportedOperationException(
+        s"cannot do any operation about partition on the non-partitioned table ${table.identifier}")
+    }
+  }
+
+  private def checkPartitionSpec(partSpec: PartitionSpec, table: ExternalCatalogTable): Unit =
+    if (!CollectionUtils.isEqualCollection(partSpec.keySet, table.partitionColumnNames)) {
+      throw new IllegalArgumentException("Input partition specification is invalid!")
+    }
+
+  private class DatabaseDesc(var db: ExternalCatalogDatabase) {
+    val tables = new mutable.HashMap[String, TableDesc]
+  }
+
+  private class TableDesc(var table: ExternalCatalogTable) {
+    val partitions = new mutable.HashMap[PartitionSpec, ExternalCatalogTablePartition]
   }
 
 }
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/stats/TablePartitionStats.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/stats/TablePartitionStats.scala
new file mode 100644
index 00000000000..d7293a0dd8a
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/stats/TablePartitionStats.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.stats
+
+import java.util.{HashMap, Map}
+
+/**
+  * Table statistics
+  *
+  * @param rowCount cardinality of table
+  * @param colStats statistics of table columns
+  */
+case class TablePartitionStats(rowCount: Long, colStats: Map[String, ColumnStats] = new HashMap())
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogTablePartition.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogTablePartition.scala
new file mode 100644
index 00000000000..4137a5a5b6a
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogTablePartition.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog
+
+import java.util.{LinkedHashMap => JLinkedHashMap, Map => JMap, HashMap => JHashMap}
+
+import org.apache.flink.table.plan.stats.TablePartitionStats
+
+object ExternalCatalogTypes {
+
+  /**
+    * external table partition specification.
+    * Key is partition column name, value is partition column value.
+    */
+  type PartitionSpec = JLinkedHashMap[String, String]
+}
+
+/**
+  * Partition definition of an external Catalog table
+  *
+  * @param partitionSpec partition specification
+  * @param properties    partition properties
+  * @param stats         partition statistics
+  */
+case class ExternalCatalogTablePartition(
+    partitionSpec: ExternalCatalogTypes.PartitionSpec,
+    properties: JMap[String, String] = new JHashMap(),
+    stats: TablePartitionStats = null)
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/InMemoryExternalCatalogTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/InMemoryExternalCatalogTest.scala
index 5402780b3d3..6b9694208aa 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/InMemoryExternalCatalogTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/InMemoryExternalCatalogTest.scala
@@ -18,6 +18,9 @@
 
 package org.apache.flink.table.catalog
 
+import java.util.{LinkedHashMap => JLinkedHashMap, LinkedHashSet => JLinkedHashSet}
+
+import com.google.common.collect.{ImmutableMap, ImmutableSet}
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo
 import org.apache.flink.table.api._
 import org.junit.{Before, Test}
@@ -35,10 +38,173 @@ class InMemoryExternalCatalogTest {
     catalog.createDatabase(ExternalCatalogDatabase(databaseName), ignoreIfExists = false)
   }
 
+  @Test
+  def testCreatePartition(): Unit = {
+    val tableName = "t1"
+    catalog.createTable(
+      createPartitionedTableInstance(databaseName, tableName),
+      ignoreIfExists = false)
+    assertTrue(catalog.listPartitions(databaseName, tableName).isEmpty)
+    val newPartitionSpec = new JLinkedHashMap[String, String](
+      ImmutableMap.of("hour", "12", "ds", "2016-02-01"))
+    val newPartition = ExternalCatalogTablePartition(newPartitionSpec)
+    catalog.createPartition(databaseName, tableName, newPartition, false)
+    val partitionSpecs = catalog.listPartitions(databaseName, tableName)
+    assertEquals(partitionSpecs.size(), 1)
+    assertEquals(partitionSpecs.get(0), newPartitionSpec)
+  }
+
+  @Test(expected = classOf[UnsupportedOperationException])
+  def testCreatePartitionOnUnPartitionedTable(): Unit = {
+    val tableName = "t1"
+    catalog.createTable(
+      createNonPartitionedTableInstance(databaseName, tableName),
+      ignoreIfExists = false)
+    val newPartitionSpec = new JLinkedHashMap[String, String](
+      ImmutableMap.of("ds", "2016-02-01", "hour", "12"))
+    val newPartition = ExternalCatalogTablePartition(newPartitionSpec)
+    catalog.createPartition(databaseName, tableName, newPartition, false)
+  }
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def testCreateInvalidPartitionSpec(): Unit = {
+    val tableName = "t1"
+    catalog.createTable(
+      createPartitionedTableInstance(databaseName, tableName),
+      ignoreIfExists = false)
+    val newPartitionSpec = new JLinkedHashMap[String, String](
+      ImmutableMap.of("ds", "2016-02-01", "h", "12"))
+    val newPartition = ExternalCatalogTablePartition(newPartitionSpec)
+    catalog.createPartition(databaseName, tableName, newPartition, false)
+  }
+
+  @Test(expected = classOf[PartitionAlreadyExistException])
+  def testCreateExistedPartition(): Unit = {
+    val tableName = "t1"
+    catalog.createTable(
+      createPartitionedTableInstance(databaseName, tableName),
+      ignoreIfExists = false)
+    val newPartitionSpec = new JLinkedHashMap[String, String](
+      ImmutableMap.of("ds", "2016-02-01", "hour", "12"))
+    val newPartition = ExternalCatalogTablePartition(newPartitionSpec)
+    catalog.createPartition(databaseName, tableName, newPartition, false)
+    val newPartitionSpec1 = new JLinkedHashMap[String, String](
+      ImmutableMap.of("hour", "12", "ds", "2016-02-01"))
+    val newPartition1 = ExternalCatalogTablePartition(newPartitionSpec1)
+    catalog.createPartition(databaseName, tableName, newPartition1, false)
+  }
+
+  @Test(expected = classOf[TableNotExistException])
+  def testCreatePartitionOnNotExistTable(): Unit = {
+    val newPartitionSpec = new JLinkedHashMap[String, String](
+      ImmutableMap.of("ds", "2016-02-01", "hour", "12"))
+    val newPartition = ExternalCatalogTablePartition(newPartitionSpec)
+    catalog.createPartition(databaseName, "notexistedTb", newPartition, false)
+  }
+
+  @Test(expected = classOf[DatabaseNotExistException])
+  def testCreatePartitionOnNotExistDatabase(): Unit = {
+    val tableName = "t1"
+    val newPartitionSpec = new JLinkedHashMap[String, String](
+      ImmutableMap.of("ds", "2016-02-01", "hour", "12"))
+    val newPartition = ExternalCatalogTablePartition(newPartitionSpec)
+    catalog.createPartition("notexistedDb", tableName, newPartition, false)
+  }
+
+  @Test
+  def testGetPartition(): Unit = {
+    val tableName = "t1"
+    catalog.createTable(
+      createPartitionedTableInstance(databaseName, tableName),
+      ignoreIfExists = false)
+    val newPartitionSpec = new JLinkedHashMap[String, String](
+      ImmutableMap.of("ds", "2016-02-01", "hour", "12"))
+    val newPartition = ExternalCatalogTablePartition(newPartitionSpec)
+    catalog.createPartition(databaseName, tableName, newPartition, false)
+    assertEquals(catalog.getPartition(databaseName, tableName, newPartitionSpec), newPartition)
+  }
+
+  @Test(expected = classOf[PartitionNotExistException])
+  def testGetNotExistPartition(): Unit = {
+    val tableName = "t1"
+    catalog.createTable(
+      createPartitionedTableInstance(databaseName, tableName),
+      ignoreIfExists = false)
+    val newPartitionSpec = new JLinkedHashMap[String, String](
+      ImmutableMap.of("ds", "2016-02-01", "hour", "12"))
+    val newPartition = ExternalCatalogTablePartition(newPartitionSpec)
+    assertEquals(catalog.getPartition(databaseName, tableName, newPartitionSpec), newPartition)
+  }
+
+  @Test
+  def testDropPartition(): Unit = {
+    val tableName = "t1"
+    catalog.createTable(
+      createPartitionedTableInstance(databaseName, tableName),
+      ignoreIfExists = false)
+    val newPartitionSpec = new JLinkedHashMap[String, String](
+      ImmutableMap.of("ds", "2016-02-01", "hour", "12"))
+    val newPartition = ExternalCatalogTablePartition(newPartitionSpec)
+    catalog.createPartition(databaseName, tableName, newPartition, false)
+    assertTrue(catalog.listPartitions(databaseName, tableName).contains(newPartitionSpec))
+    catalog.dropPartition(databaseName, tableName, newPartitionSpec, false)
+    assertFalse(catalog.listPartitions(databaseName, tableName).contains(newPartitionSpec))
+  }
+
+  @Test(expected = classOf[PartitionNotExistException])
+  def testDropNotExistPartition(): Unit = {
+    val tableName = "t1"
+    catalog.createTable(
+      createPartitionedTableInstance(databaseName, tableName),
+      ignoreIfExists = false)
+    val partitionSpec = new JLinkedHashMap[String, String](
+      ImmutableMap.of("ds", "2016-02-01", "hour", "12"))
+    catalog.dropPartition(databaseName, tableName, partitionSpec, false)
+  }
+
+  @Test
+  def testListPartitionSpec(): Unit = {
+    val tableName = "t1"
+    catalog.createTable(
+      createPartitionedTableInstance(databaseName, tableName),
+      ignoreIfExists = false)
+    assertTrue(catalog.listPartitions(databaseName, tableName).isEmpty)
+    val newPartitionSpec = new JLinkedHashMap[String, String](
+      ImmutableMap.of("ds", "2016-02-01", "hour", "12"))
+    val newPartition = ExternalCatalogTablePartition(newPartitionSpec)
+    catalog.createPartition(databaseName, tableName, newPartition, false)
+    val partitionSpecs = catalog.listPartitions(databaseName, tableName)
+    assertEquals(partitionSpecs.size(), 1)
+    assertEquals(partitionSpecs.get(0), newPartitionSpec)
+  }
+
+  @Test
+  def testAlterPartition(): Unit = {
+    val tableName = "t1"
+    catalog.createTable(
+      createPartitionedTableInstance(databaseName, tableName),
+      ignoreIfExists = false)
+    val newPartitionSpec = new JLinkedHashMap[String, String](
+      ImmutableMap.of("ds", "2016-02-01", "hour", "12"))
+    val newPartition = ExternalCatalogTablePartition(
+      newPartitionSpec,
+      properties = ImmutableMap.of("location" , "/tmp/ds=2016-02-01/hour=12"))
+    catalog.createPartition(databaseName, tableName, newPartition, false)
+    val updatedPartition = ExternalCatalogTablePartition(
+      newPartitionSpec,
+      properties = ImmutableMap.of("location",  "/tmp1/ds=2016-02-01/hour=12"))
+    catalog.alterPartition(databaseName, tableName, updatedPartition, false)
+    val currentPartition = catalog.getPartition(databaseName, tableName, newPartitionSpec)
+    assertEquals(currentPartition, updatedPartition)
+    assertNotEquals(currentPartition, newPartition)
+  }
+
   @Test
   def testCreateTable(): Unit = {
     assertTrue(catalog.listTables(databaseName).isEmpty)
-    catalog.createTable(createTableInstance(databaseName, "t1"), ignoreIfExists = false)
+    catalog.createTable(
+      createNonPartitionedTableInstance(databaseName, "t1"),
+      ignoreIfExists = false)
     val tables = catalog.listTables(databaseName)
     assertEquals(1, tables.size())
     assertEquals("t1", tables.get(0))
@@ -47,13 +213,13 @@ class InMemoryExternalCatalogTest {
   @Test(expected = classOf[TableAlreadyExistException])
   def testCreateExistedTable(): Unit = {
     val tableName = "t1"
-    catalog.createTable(createTableInstance(databaseName, tableName), false)
-    catalog.createTable(createTableInstance(databaseName, tableName), false)
+    catalog.createTable(createNonPartitionedTableInstance(databaseName, tableName), false)
+    catalog.createTable(createNonPartitionedTableInstance(databaseName, tableName), false)
   }
 
   @Test
   def testGetTable(): Unit = {
-    val originTable = createTableInstance(databaseName, "t1")
+    val originTable = createNonPartitionedTableInstance(databaseName, "t1")
     catalog.createTable(originTable, false)
     assertEquals(catalog.getTable(databaseName, "t1"), originTable)
   }
@@ -71,10 +237,10 @@ class InMemoryExternalCatalogTest {
   @Test
   def testAlterTable(): Unit = {
     val tableName = "t1"
-    val table = createTableInstance(databaseName, tableName)
+    val table = createNonPartitionedTableInstance(databaseName, tableName)
     catalog.createTable(table, false)
     assertEquals(catalog.getTable(databaseName, tableName), table)
-    val newTable = createTableInstance(databaseName, tableName)
+    val newTable = createNonPartitionedTableInstance(databaseName, tableName)
     catalog.alterTable(newTable, false)
     val currentTable = catalog.getTable(databaseName, tableName)
     // validate the table is really replaced after alter table
@@ -84,13 +250,13 @@ class InMemoryExternalCatalogTest {
 
   @Test(expected = classOf[TableNotExistException])
   def testAlterNotExistTable(): Unit = {
-    catalog.alterTable(createTableInstance(databaseName, "t1"), false)
+    catalog.alterTable(createNonPartitionedTableInstance(databaseName, "t1"), false)
   }
 
   @Test
   def testDropTable(): Unit = {
     val tableName = "t1"
-    catalog.createTable(createTableInstance(databaseName, tableName), false)
+    catalog.createTable(createNonPartitionedTableInstance(databaseName, tableName), false)
     assertTrue(catalog.listTables(databaseName).contains(tableName))
     catalog.dropTable(databaseName, tableName, false)
     assertFalse(catalog.listTables(databaseName).contains(tableName))
@@ -130,17 +296,37 @@ class InMemoryExternalCatalogTest {
     catalog.createDatabase(ExternalCatalogDatabase(databaseName), false)
   }
 
-  private def createTableInstance(dbName: String, tableName: String): ExternalCatalogTable = {
+  private def createNonPartitionedTableInstance(
+      dbName: String,
+      tableName: String): ExternalCatalogTable = {
     val schema = new TableSchema(
       Array("first", "second"),
       Array(
         BasicTypeInfo.STRING_TYPE_INFO,
         BasicTypeInfo.INT_TYPE_INFO
-      )
-    )
+      ))
     ExternalCatalogTable(
       TableIdentifier(dbName, tableName),
       "csv",
       schema)
   }
+
+  private def createPartitionedTableInstance(
+      dbName: String,
+      tableName: String): ExternalCatalogTable = {
+    val schema = new TableSchema(
+      Array("first", "second"),
+      Array(
+        BasicTypeInfo.STRING_TYPE_INFO,
+        BasicTypeInfo.INT_TYPE_INFO
+      ))
+    ExternalCatalogTable(
+      TableIdentifier(dbName, tableName),
+      "hive",
+      schema,
+      partitionColumnNames = new JLinkedHashSet(ImmutableSet.of("ds","hour")),
+      isPartitioned = true
+    )
+  }
+
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services