You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2018/10/08 15:57:36 UTC
ignite git commit: IGNITE-9228: Spark SQL Table Schema Specification
- Fixes #4551.
Repository: ignite
Updated Branches:
refs/heads/master 5f055c7f5 -> b50a9dc18
IGNITE-9228: Spark SQL Table Schema Specification - Fixes #4551.
Signed-off-by: Nikolay Izhikov <ni...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b50a9dc1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b50a9dc1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b50a9dc1
Branch: refs/heads/master
Commit: b50a9dc18a115d5fc599ac15a11546911ae0bc45
Parents: 5f055c7
Author: stuartmacd <29...@users.noreply.github.com>
Authored: Mon Oct 8 18:21:04 2018 +0300
Committer: Nikolay Izhikov <ni...@apache.org>
Committed: Mon Oct 8 18:46:40 2018 +0300
----------------------------------------------------------------------
.../ignite/spark/IgniteDataFrameSettings.scala | 16 +++-
.../spark/impl/IgniteRelationProvider.scala | 17 +++-
.../ignite/spark/impl/IgniteSQLRelation.scala | 12 +--
.../apache/ignite/spark/impl/QueryHelper.scala | 13 ++-
.../org/apache/ignite/spark/impl/package.scala | 95 ++++++++++----------
.../sql/ignite/IgniteExternalCatalog.scala | 92 +++++++++----------
.../spark/sql/ignite/IgniteOptimization.scala | 3 +-
.../ignite/spark/AbstractDataFrameSpec.scala | 4 +-
.../apache/ignite/spark/IgniteCatalogSpec.scala | 76 +++++++++++++++-
.../ignite/spark/IgniteSQLDataFrameSpec.scala | 39 ++++++++
.../spark/IgniteSQLDataFrameWriteSpec.scala | 23 ++++-
11 files changed, 274 insertions(+), 116 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b50a9dc1/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteDataFrameSettings.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteDataFrameSettings.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteDataFrameSettings.scala
index 9daaec4..e176721 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteDataFrameSettings.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteDataFrameSettings.scala
@@ -55,6 +55,20 @@ object IgniteDataFrameSettings {
val OPTION_TABLE = "table"
/**
+ * Config option to specify the Ignite SQL schema name in which the specified table is present.
+ * If this is not specified, all schemata will be scanned for a table name which matches the given table
+ * name and the first matching table will be used. This option can be used when there are multiple tables in
+ * different schemata with the same table name to disambiguate the tables.
+ *
+ * @example {{{
+ * val igniteDF = spark.read.format(IGNITE)
+ * .option(OPTION_TABLE, "myTable")
+ * .option(OPTION_SCHEMA, "mySchema")
+ * }}}
+ */
+ val OPTION_SCHEMA = "schema"
+
+ /**
* Config option to specify newly created Ignite SQL table parameters.
* Value of these option will be used in `CREATE TABLE ... WITH "option value goes here"`
*
@@ -123,7 +137,7 @@ object IgniteDataFrameSettings {
/**
* Config option for saving data frame.
* Internally all SQL inserts are done through `IgniteDataStreamer`.
- * This options sets perNodeBufferSize` property of streamer.
+ * This options sets `perNodeBufferSize` property of streamer.
*
* @example {{{
* val igniteDF = spark.write.format(IGNITE)
http://git-wip-us.apache.org/repos/asf/ignite/blob/b50a9dc1/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteRelationProvider.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteRelationProvider.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteRelationProvider.scala
index e4fa9f7..71195d5 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteRelationProvider.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteRelationProvider.scala
@@ -55,12 +55,14 @@ class IgniteRelationProvider extends RelationProvider
* @see IgniteRelation
* @see IgnitionEx#grid(String)
* @see org.apache.ignite.spark.IgniteDataFrameSettings.OPTION_TABLE
+ * @see org.apache.ignite.spark.IgniteDataFrameSettings.OPTION_SCHEMA
* @see org.apache.ignite.spark.IgniteDataFrameSettings.OPTION_CONFIG_FILE
*/
override def createRelation(sqlCtx: SQLContext, params: Map[String, String]): BaseRelation =
createRelation(
igniteContext(params, sqlCtx),
params.getOrElse(OPTION_TABLE, throw new IgniteException("'table' must be specified.")),
+ params.get(OPTION_SCHEMA),
sqlCtx)
/**
@@ -101,7 +103,7 @@ class IgniteRelationProvider extends RelationProvider
val tblName = tableName(params)
- val tblInfoOption = sqlTableInfo[Any, Any](ctx.ignite(), tblName)
+ val tblInfoOption = sqlTableInfo[Any, Any](ctx.ignite(), tblName, params.get(OPTION_SCHEMA))
if (tblInfoOption.isDefined) {
mode match {
@@ -120,6 +122,7 @@ class IgniteRelationProvider extends RelationProvider
saveTable(data,
tblName,
+ params.get(OPTION_SCHEMA),
ctx,
params.get(OPTION_STREAMER_ALLOW_OVERWRITE).map(_.toBoolean),
params.get(OPTION_STREAMER_FLUSH_FREQUENCY).map(_.toLong),
@@ -129,6 +132,7 @@ class IgniteRelationProvider extends RelationProvider
case Append ⇒
saveTable(data,
tblName,
+ params.get(OPTION_SCHEMA),
ctx,
params.get(OPTION_STREAMER_ALLOW_OVERWRITE).map(_.toBoolean),
params.get(OPTION_STREAMER_FLUSH_FREQUENCY).map(_.toLong),
@@ -159,6 +163,7 @@ class IgniteRelationProvider extends RelationProvider
saveTable(data,
tblName,
+ params.get(OPTION_SCHEMA),
ctx,
params.get(OPTION_STREAMER_ALLOW_OVERWRITE).map(_.toBoolean),
params.get(OPTION_STREAMER_FLUSH_FREQUENCY).map(_.toLong),
@@ -168,25 +173,28 @@ class IgniteRelationProvider extends RelationProvider
createRelation(ctx,
tblName,
+ params.get(OPTION_SCHEMA),
sqlCtx)
}
/**
* @param igniteCtx Ignite context.
* @param tblName Table name.
+ * @param schema Optional schema name.
* @param sqlCtx SQL context.
* @return Ignite SQL relation.
*/
- private def createRelation(igniteCtx: IgniteContext, tblName: String, sqlCtx: SQLContext): BaseRelation = {
+ private def createRelation(igniteCtx: IgniteContext, tblName: String, schema: Option[String], sqlCtx: SQLContext):
+ BaseRelation = {
val optimizationDisabled =
sqlCtx.sparkSession.conf.get(OPTION_DISABLE_SPARK_SQL_OPTIMIZATION, "false").toBoolean
val experimentalMethods = sqlCtx.sparkSession.sessionState.experimentalMethods
if (optimizationDisabled) {
- experimentalMethods.extraOptimizations =
+ experimentalMethods.extraOptimizations =
experimentalMethods.extraOptimizations.filter(_ != IgniteOptimization)
- }
+ }
else {
val optimizationExists = experimentalMethods.extraOptimizations.contains(IgniteOptimization)
@@ -197,6 +205,7 @@ class IgniteRelationProvider extends RelationProvider
IgniteSQLRelation(
igniteCtx,
tblName,
+ schema,
sqlCtx)
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b50a9dc1/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLRelation.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLRelation.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLRelation.scala
index 1b4f277..cabf311 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLRelation.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLRelation.scala
@@ -34,14 +34,15 @@ import scala.collection.JavaConversions._
*/
class IgniteSQLRelation[K, V](
private[apache] val ic: IgniteContext,
- private[apache] val tableName: String)
+ private[apache] val tableName: String,
+ private[apache] val schemaName: Option[String])
(@transient val sqlContext: SQLContext) extends BaseRelation with PrunedFilteredScan with Logging {
/**
* @return Schema of Ignite SQL table.
*/
override def schema: StructType =
- igniteSQLTable(ic.ignite(), tableName)
+ igniteSQLTable(ic.ignite(), tableName, schemaName)
.map(IgniteSQLRelation.schema)
.getOrElse(throw new IgniteException(s"Unknown table $tableName"))
@@ -101,7 +102,7 @@ class IgniteSQLRelation[K, V](
* Cache name for a table name.
*/
private lazy val cacheName: String =
- sqlCacheName(ic.ignite(), tableName)
+ sqlCacheName(ic.ignite(), tableName, schemaName)
.getOrElse(throw new IgniteException(s"Unknown table $tableName"))
}
@@ -126,6 +127,7 @@ object IgniteSQLRelation {
})
}
- def apply[K, V](ic: IgniteContext, tableName: String, sqlContext: SQLContext): IgniteSQLRelation[K, V] =
- new IgniteSQLRelation[K, V](ic,tableName)(sqlContext)
+ def apply[K, V](ic: IgniteContext, tableName: String, schemaName: Option[String],
+ sqlContext: SQLContext): IgniteSQLRelation[K, V] =
+ new IgniteSQLRelation[K, V](ic, tableName, schemaName)(sqlContext)
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b50a9dc1/modules/spark/src/main/scala/org/apache/ignite/spark/impl/QueryHelper.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/QueryHelper.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/QueryHelper.scala
index 4342265..fc2a40d 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/QueryHelper.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/QueryHelper.scala
@@ -21,6 +21,7 @@ import org.apache.ignite.cache.query.SqlFieldsQuery
import org.apache.ignite.spark.IgniteDataFrameSettings._
import QueryUtils.{compileCreateTable, compileDropTable, compileInsert}
import org.apache.ignite.internal.IgniteEx
+import org.apache.ignite.internal.processors.query.QueryUtils.DFLT_SCHEMA
import org.apache.ignite.spark.IgniteContext
import org.apache.ignite.{Ignite, IgniteException}
import org.apache.spark.sql.types.StructType
@@ -72,6 +73,11 @@ private[apache] object QueryHelper {
if (!params.contains(OPTION_TABLE) && !params.contains("path"))
throw new IgniteException("'table' must be specified.")
+ if (params.contains(OPTION_SCHEMA) && !params(OPTION_SCHEMA).equalsIgnoreCase(DFLT_SCHEMA)) {
+ throw new IgniteException("Creating new tables in schema " + params(OPTION_SCHEMA) + " is not valid, tables"
+ + " must only be created in " + DFLT_SCHEMA)
+ }
+
params.get(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS)
.map(_.split(','))
.getOrElse(throw new IgniteException("Can't create table! Primary key fields has to be specified."))
@@ -91,6 +97,7 @@ private[apache] object QueryHelper {
*
* @param data Data.
* @param tblName Table name.
+ * @param schemaName Optional schema name.
* @param ctx Ignite context.
* @param streamerAllowOverwrite Flag enabling overwriting existing values in cache.
* @param streamerFlushFrequency Insert query streamer automatic flush frequency.
@@ -105,6 +112,7 @@ private[apache] object QueryHelper {
*/
def saveTable(data: DataFrame,
tblName: String,
+ schemaName: Option[String],
ctx: IgniteContext,
streamerAllowOverwrite: Option[Boolean],
streamerFlushFrequency: Option[Long],
@@ -117,6 +125,7 @@ private[apache] object QueryHelper {
savePartition(iterator,
insertQry,
tblName,
+ schemaName,
ctx,
streamerAllowOverwrite,
streamerFlushFrequency,
@@ -131,6 +140,7 @@ private[apache] object QueryHelper {
* @param iterator Data iterator.
* @param insertQry Insert query.
* @param tblName Table name.
+ * @param schemaName Optional schema name.
* @param ctx Ignite context.
* @param streamerAllowOverwrite Flag enabling overwriting existing values in cache.
* @param streamerFlushFrequency Insert query streamer automatic flush frequency.
@@ -146,13 +156,14 @@ private[apache] object QueryHelper {
private def savePartition(iterator: Iterator[Row],
insertQry: String,
tblName: String,
+ schemaName: Option[String],
ctx: IgniteContext,
streamerAllowOverwrite: Option[Boolean],
streamerFlushFrequency: Option[Long],
streamerPerNodeBufferSize: Option[Int],
streamerPerNodeParallelOperations: Option[Int]
): Unit = {
- val tblInfo = sqlTableInfo[Any, Any](ctx.ignite(), tblName).get
+ val tblInfo = sqlTableInfo[Any, Any](ctx.ignite(), tblName, schemaName).get
val streamer = ctx.ignite().dataStreamer(tblInfo._1.getName)
http://git-wip-us.apache.org/repos/asf/ignite/blob/b50a9dc1/modules/spark/src/main/scala/org/apache/ignite/spark/impl/package.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/package.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/package.scala
index be7b366..36ec956 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/package.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/package.scala
@@ -18,10 +18,11 @@
package org.apache.ignite.spark
import org.apache.commons.lang.StringUtils.equalsIgnoreCase
-import org.apache.ignite.{Ignite, IgniteException, IgniteState, Ignition}
+import org.apache.ignite.{Ignite, Ignition}
import org.apache.ignite.cache.{CacheMode, QueryEntity}
import org.apache.ignite.cluster.ClusterNode
import org.apache.ignite.configuration.CacheConfiguration
+import org.apache.ignite.internal.processors.query.QueryUtils.normalizeSchemaName
import org.apache.ignite.internal.util.lang.GridFunc.contains
import org.apache.spark.Partition
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
@@ -31,26 +32,6 @@ import scala.collection.mutable.ArrayBuffer
package object impl {
/**
- * Checks named instance of Ignite exists.
- * Throws IgniteException if not.
- *
- * @param gridName Name of grid.
- */
- def ensureIgnite(gridName: String): Unit =
- if (!igniteExists(gridName))
- throw new IgniteException(s"Ignite grid with name '$gridName' does not exist.")
-
- /**
- * @param gridName Name of grid.
- * @return True if named instance of Ignite exists false otherwise.
- */
- def igniteExists(gridName: String): Boolean =
- if (gridName == "")
- Ignition.state() == IgniteState.STARTED
- else
- Ignition.state(gridName) == IgniteState.STARTED
-
- /**
* @param g Ignite.
* @return Name of Ignite. If name is null empty string returned.
*/
@@ -61,19 +42,15 @@ package object impl {
""
/**
- * @param name Name of grid..
- * @param default Default instance.
- * @return Named grid instance if it exists. If not default instance returned.
+ * @param schema Name of schema.
+ * @param default Default schema.
+ * @return Schema to use.
*/
- def igniteOrDefault(name: String, default: Ignite): Ignite =
- if (name == SessionCatalog.DEFAULT_DATABASE) {
- if (igniteExists(name))
- ignite(name)
- else
- default
- }
+ def schemaOrDefault(schema: String, default: String): String =
+ if (schema == SessionCatalog.DEFAULT_DATABASE)
+ default
else
- ignite(name)
+ schema
/**
* @param gridName Name of grid.
@@ -88,42 +65,68 @@ package object impl {
/**
* @param ignite Ignite instance.
* @param tabName Table name.
+ * @param schemaName Optional schema name.
* @return True if table exists false otherwise.
*/
- def sqlTableExists(ignite: Ignite, tabName: String): Boolean =
- sqlTableInfo(ignite, tabName).isDefined
+ def sqlTableExists(ignite: Ignite, tabName: String, schemaName: Option[String]): Boolean =
+ sqlTableInfo(ignite, tabName, schemaName).isDefined
/**
* @param ignite Ignite instance.
* @param tabName Table name.
+ * @param schemaName Optional schema name.
* @return QueryEntity for a given table.
*/
- def igniteSQLTable(ignite: Ignite, tabName: String): Option[QueryEntity] =
- sqlTableInfo[Any, Any](ignite, tabName).map(_._2)
+ def igniteSQLTable(ignite: Ignite, tabName: String, schemaName: Option[String]): Option[QueryEntity] =
+ sqlTableInfo[Any, Any](ignite, tabName, schemaName).map(_._2)
/**
* @param ignite Ignite instance.
* @param tabName Table name.
+ * @param schemaName Optional schema name.
* @return Cache name for given table.
*/
- def sqlCacheName(ignite: Ignite, tabName: String): Option[String] =
- sqlTableInfo[Any, Any](ignite, tabName).map(_._1.getName)
+ def sqlCacheName(ignite: Ignite, tabName: String, schemaName: Option[String]): Option[String] =
+ sqlTableInfo[Any, Any](ignite, tabName, schemaName).map(_._1.getName)
+
+ /**
+ * @param ignite Ignite instance.
+ * @return All schemas in given Ignite instance.
+ */
+ def allSchemas(ignite: Ignite): Seq[String] = ignite.cacheNames()
+ .map(name =>
+ normalizeSchemaName(name,
+ ignite.cache[Any,Any](name).getConfiguration(classOf[CacheConfiguration[Any,Any]]).getSqlSchema))
+ .toSeq
+ .distinct
+
+ /**
+ * @param ignite Ignite instance.
+ * @param schemaName Schema name.
+ * @return All cache configurations for the given schema.
+ */
+ def cachesForSchema[K,V](ignite: Ignite, schemaName: Option[String]): Seq[CacheConfiguration[K,V]] =
+ ignite.cacheNames
+ .map(ignite.cache[K,V](_).getConfiguration(classOf[CacheConfiguration[K,V]]))
+ .filter(ccfg =>
+ schemaName.forall(normalizeSchemaName(ccfg.getName, ccfg.getSqlSchema).equalsIgnoreCase(_)) ||
+ schemaName.contains(SessionCatalog.DEFAULT_DATABASE))
+ .toSeq
/**
* @param ignite Ignite instance.
* @param tabName Table name.
+ * @param schemaName Optional schema name.
* @tparam K Key class.
* @tparam V Value class.
* @return CacheConfiguration and QueryEntity for a given table.
*/
- def sqlTableInfo[K, V](ignite: Ignite, tabName: String): Option[(CacheConfiguration[K, V], QueryEntity)] =
- ignite.cacheNames().map { cacheName ⇒
- val ccfg = ignite.cache[K, V](cacheName).getConfiguration(classOf[CacheConfiguration[K, V]])
-
- val queryEntities = ccfg.getQueryEntities
-
- queryEntities.find(_.getTableName.equalsIgnoreCase(tabName)).map(qe ⇒ (ccfg, qe))
- }.find(_.isDefined).flatten
+ def sqlTableInfo[K, V](ignite: Ignite, tabName: String,
+ schemaName: Option[String]): Option[(CacheConfiguration[K, V], QueryEntity)] =
+ cachesForSchema[K,V](ignite, schemaName)
+ .map(ccfg ⇒ ccfg.getQueryEntities.find(_.getTableName.equalsIgnoreCase(tabName)).map(qe ⇒ (ccfg, qe)))
+ .find(_.isDefined)
+ .flatten
/**
* @param table Table.
http://git-wip-us.apache.org/repos/asf/ignite/blob/b50a9dc1/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteExternalCatalog.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteExternalCatalog.scala b/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteExternalCatalog.scala
index f632d2a..9e8290c 100644
--- a/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteExternalCatalog.scala
+++ b/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteExternalCatalog.scala
@@ -19,12 +19,12 @@ package org.apache.spark.sql.ignite
import java.net.URI
-import org.apache.ignite.configuration.CacheConfiguration
+import org.apache.ignite.internal.processors.query.QueryUtils.DFLT_SCHEMA
import org.apache.ignite.spark.IgniteDataFrameSettings.OPTION_TABLE
import org.apache.ignite.spark.IgniteContext
import org.apache.ignite.spark.IgniteDataFrameSettings._
import org.apache.ignite.spark.impl.IgniteSQLRelation.schema
-import org.apache.ignite.{Ignite, IgniteException, Ignition}
+import org.apache.ignite.{Ignite, IgniteException}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
@@ -41,14 +41,16 @@ import scala.collection.JavaConversions._
/**
* External catalog implementation to provide transparent access to SQL tables existed in Ignite.
*
- * @param defaultIgniteContext Ignite context to provide access to Ignite instance. If <code>None</code> passed then no-name instance of Ignite used.
+ * @param igniteContext Ignite context to provide access to Ignite instance.
*/
-private[ignite] class IgniteExternalCatalog(defaultIgniteContext: IgniteContext)
+private[ignite] class IgniteExternalCatalog(igniteContext: IgniteContext)
extends ExternalCatalog {
/**
* Default Ignite instance.
*/
- @transient private var default: Ignite = defaultIgniteContext.ignite
+ @transient private val ignite: Ignite = igniteContext.ignite()
+
+ @transient private var currentSchema = DEFAULT_DATABASE
/**
* @param db Ignite instance name.
@@ -58,56 +60,52 @@ private[ignite] class IgniteExternalCatalog(defaultIgniteContext: IgniteContext)
CatalogDatabase(db, db, IGNITE_URI, Map.empty)
/**
- * Checks Ignite instance with provided name exists.
- * If <code>db == SessionCatalog.DEFAULT_DATABASE</code> checks for a default Ignite instance.
+ * Checks Ignite schema with provided name exists.
*
- * @param db Ignite instance name or <code>SessionCatalog.DEFAULT_DATABASE</code>.
- * @return True is Ignite instance exists.
+ * @param schema Ignite schema name or <code>SessionCatalog.DEFAULT_DATABASE</code>.
+ * @return True is Ignite schema exists.
*/
- override def databaseExists(db: String): Boolean =
- db == DEFAULT_DATABASE || igniteExists(db)
+ override def databaseExists(schema: String): Boolean =
+ schema == DEFAULT_DATABASE || allSchemas(ignite).exists(schema.equalsIgnoreCase)
/**
- * @return List of all known Ignite instances names.
+ * @return List of all known Ignite schemas.
*/
override def listDatabases(): Seq[String] =
- Ignition.allGrids().map(igniteName)
+ allSchemas(ignite)
/**
* @param pattern Pattern to filter databases names.
- * @return List of all known Ignite instances names filtered by pattern.
+ * @return List of all known Ignite schema names filtered by pattern.
*/
override def listDatabases(pattern: String): Seq[String] =
StringUtils.filterPattern(listDatabases(), pattern)
/**
- * Sets default Ignite instance.
+ * Sets default Ignite schema.
*
- * @param db Name of Ignite instance.
+ * @param schema Name of Ignite schema.
*/
- override def setCurrentDatabase(db: String): Unit = {
- ensureIgnite(db)
-
- default = ignite(db)
- }
+ override def setCurrentDatabase(schema: String): Unit =
+ currentSchema = schema
/** @inheritdoc */
override def getTable(db: String, table: String): CatalogTable = getTableOption(db, table).get
- def getTableOption(db: String, tabName: String): Option[CatalogTable] = {
- val ignite = igniteOrDefault(db, default)
-
+ def getTableOption(db: String, tabName: String): Option[CatalogTable] = {
val gridName = igniteName(ignite)
- igniteSQLTable(ignite, tabName) match {
+ val schemaName = schemaOrDefault(db, currentSchema)
+
+ igniteSQLTable(ignite, tabName, Some(db)) match {
case Some(table) ⇒
val tableName = table.getTableName
Some(new CatalogTable(
- identifier = new TableIdentifier(tableName, Some(gridName)),
+ identifier = new TableIdentifier(tableName, Some(schemaName)),
tableType = CatalogTableType.EXTERNAL,
storage = CatalogStorageFormat(
- locationUri = Some(URI.create(IGNITE_PROTOCOL + tableName)),
+ locationUri = Some(URI.create(IGNITE_PROTOCOL + schemaName + "/" + tableName)),
inputFormat = Some(FORMAT_IGNITE),
outputFormat = Some(FORMAT_IGNITE),
serde = None,
@@ -130,23 +128,16 @@ private[ignite] class IgniteExternalCatalog(defaultIgniteContext: IgniteContext)
/** @inheritdoc */
override def tableExists(db: String, table: String): Boolean =
- sqlTableExists(igniteOrDefault(db, default), table)
+ sqlTableExists(ignite, table, Some(schemaOrDefault(db, currentSchema)))
/** @inheritdoc */
override def listTables(db: String): Seq[String] = listTables(db, ".*")
/** @inheritdoc */
- override def listTables(db: String, pattern: String): Seq[String] = {
- val ignite = igniteOrDefault(db, default)
-
- ignite.cacheNames.flatten { name =>
- val cache = ignite.cache[Any, Any](name)
-
- val ccfg = cache.getConfiguration(classOf[CacheConfiguration[Any, Any]])
-
- ccfg.getQueryEntities.map(_.getTableName)
- }.toSeq
- }
+ override def listTables(db: String, pattern: String): Seq[String] =
+ StringUtils.filterPattern(
+ cachesForSchema[Any,Any](ignite, Some(schemaOrDefault(db, currentSchema)))
+ .flatMap(_.getQueryEntities.map(_.getTableName)), pattern)
/** @inheritdoc */
override def loadTable(db: String, table: String,
@@ -161,9 +152,7 @@ private[ignite] class IgniteExternalCatalog(defaultIgniteContext: IgniteContext)
/** @inheritdoc */
override def listPartitionNames(db: String, table: String, partialSpec: Option[TablePartitionSpec]): Seq[String] = {
- val ignite = igniteOrDefault(db, default)
-
- sqlCacheName(ignite, table).map { cacheName ⇒
+ sqlCacheName(ignite, table, Some(schemaOrDefault(db, currentSchema))).map { cacheName ⇒
val parts = ignite.affinity(cacheName).partitions()
(0 until parts).map(_.toString)
@@ -173,14 +162,13 @@ private[ignite] class IgniteExternalCatalog(defaultIgniteContext: IgniteContext)
/** @inheritdoc */
override def listPartitions(db: String, table: String,
partialSpec: Option[TablePartitionSpec]): Seq[CatalogTablePartition] = {
- val ignite = igniteOrDefault(db, default)
val partitionNames = listPartitionNames(db, table, partialSpec)
if (partitionNames.isEmpty)
Seq.empty
else {
- val cacheName = sqlCacheName(ignite, table).get
+ val cacheName = sqlCacheName(ignite, table, Some(schemaOrDefault(db, currentSchema))).get
val aff = ignite.affinity[Any](cacheName)
@@ -271,13 +259,16 @@ private[ignite] class IgniteExternalCatalog(defaultIgniteContext: IgniteContext)
/** @inheritdoc */
override protected def doCreateTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = {
- val ignite = igniteOrDefault(tableDefinition.identifier.database.getOrElse(DEFAULT_DATABASE), default)
-
- igniteSQLTable(ignite, tableDefinition.identifier.table) match {
+ igniteSQLTable(ignite, tableDefinition.identifier.table, tableDefinition.identifier.database) match {
case Some(_) ⇒
/* no-op */
case None ⇒
+ val schema = tableDefinition.identifier.database
+
+ if(schema.isDefined && !schema.contains(DFLT_SCHEMA) && !schema.contains(DEFAULT_DATABASE))
+ throw new IgniteException("Can only create new tables in PUBLIC schema, not " + schema.get)
+
val props = tableDefinition.storage.properties
QueryHelper.createTable(tableDefinition.schema,
@@ -289,10 +280,8 @@ private[ignite] class IgniteExternalCatalog(defaultIgniteContext: IgniteContext)
}
/** @inheritdoc */
- override protected def doDropTable(db: String, tabName: String, ignoreIfNotExists: Boolean, purge: Boolean): Unit = {
- val ignite = igniteOrDefault(db, default)
-
- igniteSQLTable(ignite, tabName) match {
+ override protected def doDropTable(db: String, tabName: String, ignoreIfNotExists: Boolean, purge: Boolean): Unit =
+ igniteSQLTable(ignite, tabName, Some(schemaOrDefault(db, currentSchema))) match {
case Some(table) ⇒
val tableName = table.getTableName
@@ -302,7 +291,6 @@ private[ignite] class IgniteExternalCatalog(defaultIgniteContext: IgniteContext)
if (!ignoreIfNotExists)
throw new IgniteException(s"Table $tabName doesn't exists.")
}
- }
/** @inheritdoc */
override protected def doRenameTable(db: String, oldName: String, newName: String): Unit =
http://git-wip-us.apache.org/repos/asf/ignite/blob/b50a9dc1/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteOptimization.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteOptimization.scala b/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteOptimization.scala
index 2d97792..0455e86 100644
--- a/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteOptimization.scala
+++ b/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteOptimization.scala
@@ -73,7 +73,8 @@ object IgniteOptimization extends Rule[LogicalPlan] with Logging {
catalogTable = _catalogTable,
aliasIndex = aliasIndexIterator,
cacheName =
- sqlCacheName(igniteSqlRelation.ic.ignite(), igniteSqlRelation.tableName)
+ sqlCacheName(igniteSqlRelation.ic.ignite(), igniteSqlRelation.tableName,
+ igniteSqlRelation.schemaName)
.getOrElse(throw new IgniteException("Unknown table")))
//Logical Relation is bottomest TreeNode in LogicalPlan.
http://git-wip-us.apache.org/repos/asf/ignite/blob/b50a9dc1/modules/spark/src/test/scala/org/apache/ignite/spark/AbstractDataFrameSpec.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/AbstractDataFrameSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/AbstractDataFrameSpec.scala
index a5ea25d..25d4a5c 100644
--- a/modules/spark/src/test/scala/org/apache/ignite/spark/AbstractDataFrameSpec.scala
+++ b/modules/spark/src/test/scala/org/apache/ignite/spark/AbstractDataFrameSpec.scala
@@ -117,9 +117,11 @@ abstract class AbstractDataFrameSpec extends FunSpec with Matchers with BeforeAn
cache.query(qry.setArgs(4L.asInstanceOf[JLong], "St. Petersburg")).getAll
}
- def createEmployeeCache(client: Ignite, cacheName: String): Unit = {
+ def createEmployeeCache(client: Ignite, cacheName: String, schemaName: Option[String] = None): Unit = {
val ccfg = AbstractDataFrameSpec.cacheConfiguration[String, Employee](cacheName)
+ schemaName.foreach(ccfg.setSqlSchema)
+
val cache = client.getOrCreateCache(ccfg)
cache.put("key1", Employee(1, "John Connor", 15, 0))
http://git-wip-us.apache.org/repos/asf/ignite/blob/b50a9dc1/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteCatalogSpec.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteCatalogSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteCatalogSpec.scala
index d87d234..88e655d 100644
--- a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteCatalogSpec.scala
+++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteCatalogSpec.scala
@@ -19,12 +19,15 @@ package org.apache.ignite.spark
import java.lang.{Long ⇒ JLong}
+import org.apache.ignite.IgniteException
import org.apache.ignite.cache.query.SqlFieldsQuery
import org.apache.ignite.internal.IgnitionEx
import org.apache.ignite.internal.util.IgniteUtils.resolveIgnitePath
import org.apache.ignite.spark.AbstractDataFrameSpec.{DEFAULT_CACHE, EMPLOYEE_CACHE_NAME, TEST_CONFIG_FILE, enclose}
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog
import org.apache.spark.sql.ignite.IgniteSparkSession
import org.apache.spark.sql.types.{LongType, StringType}
+import org.junit.Assert.assertEquals
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
@@ -39,9 +42,35 @@ class IgniteCatalogSpec extends AbstractDataFrameSpec {
it("Should observe all available SQL tables") {
val tables = igniteSession.catalog.listTables.collect()
- tables.length should equal (3)
+ tables.length should equal(3)
- tables.map(_.name).sorted should equal (Array("CITY", "EMPLOYEE", "PERSON"))
+ tables.map(_.name).sorted should equal(Array("CITY", "EMPLOYEE", "PERSON"))
+ }
+
+ it("Should use the database context when providing tables") {
+ igniteSession.catalog.setCurrentDatabase("employeeSchema")
+
+ val employeeSchemaTables = igniteSession.catalog.listTables().collect()
+
+ employeeSchemaTables.map(_.name).sorted should equal(Array("EMPLOYEE"))
+
+ igniteSession.catalog.setCurrentDatabase("PUBLIC")
+
+ val publicSchemaTables = igniteSession.catalog.listTables().collect()
+
+ publicSchemaTables.map(_.name).sorted should equal(Array("CITY", "PERSON"))
+ }
+
+ it("Should provide table names given the PUBLIC schema") {
+ val tables = igniteSession.catalog.listTables("PUBLIC").collect()
+
+ tables.map(_.name).sorted should equal(Array("CITY", "PERSON"))
+ }
+
+ it("Should provide table names given a custom schema") {
+ val tables = igniteSession.catalog.listTables("employeeSchema").collect()
+
+ tables.map(_.name).sorted should equal(Array("EMPLOYEE"))
}
it("Should provide correct schema for SQL table") {
@@ -55,6 +84,12 @@ class IgniteCatalogSpec extends AbstractDataFrameSpec {
("NAME", StringType.catalogString, true)))
}
+ it("Should provide the list of all schemas") {
+ val schemas = igniteSession.catalog.listDatabases().collect()
+
+ schemas.map(_.name).sorted should equal(Array("cache3", "employeeschema", "public"))
+ }
+
it("Should provide ability to query SQL table without explicit registration") {
val res = igniteSession.sql("SELECT id, name FROM city").rdd
@@ -126,6 +161,41 @@ class IgniteCatalogSpec extends AbstractDataFrameSpec {
)
)
}
+
+ it("Should allow schema specification in the table name for public schema") {
+ val res = igniteSession.sql("SELECT id, name FROM public.city").rdd
+
+ res.count should equal(4)
+ }
+
+ it("Should allow schema specification in the table name for non-public schema") {
+ val res = igniteSession.sql("SELECT id, name, salary FROM cache3.employee").rdd
+
+ res.count should equal(3)
+ }
+
+ it("Should allow Spark SQL to create a table") {
+ igniteSession.sql(
+ "CREATE TABLE NEW_SPARK_TABLE(id LONG, name STRING) USING JSON OPTIONS ('primaryKeyFields' = 'id')")
+
+ val tables = igniteSession.catalog.listTables.collect()
+
+ tables.find(_.name == "NEW_SPARK_TABLE").map(_.name) should equal (Some("NEW_SPARK_TABLE"))
+ }
+
+ it("Should disallow creation of tables in non-PUBLIC schemas") {
+ val ex = intercept[IgniteException] {
+ igniteSession.sql(
+ "CREATE TABLE cache3.NEW_SPARK_TABLE(id LONG, name STRING) " +
+ "USING JSON OPTIONS ('primaryKeyFields' = 'id')")
+ }
+
+ assertEquals(ex.getMessage, "Can only create new tables in PUBLIC schema, not cache3")
+ }
+ }
+
+ before {
+ igniteSession.catalog.setCurrentDatabase(SessionCatalog.DEFAULT_DATABASE)
}
override protected def beforeAll(): Unit = {
@@ -137,6 +207,8 @@ class IgniteCatalogSpec extends AbstractDataFrameSpec {
createEmployeeCache(client, EMPLOYEE_CACHE_NAME)
+ createEmployeeCache(client, "myEmployeeCache", Some("employeeSchema"))
+
val configProvider = enclose(null) (_ ⇒ () ⇒ {
val cfg = IgnitionEx.loadConfiguration(TEST_CONFIG_FILE).get1()
http://git-wip-us.apache.org/repos/asf/ignite/blob/b50a9dc1/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteSQLDataFrameSpec.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteSQLDataFrameSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteSQLDataFrameSpec.scala
index 58c63b6..9593c8e 100644
--- a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteSQLDataFrameSpec.scala
+++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteSQLDataFrameSpec.scala
@@ -17,6 +17,7 @@
package org.apache.ignite.spark
+import com.google.common.collect.Iterators
import org.apache.ignite.spark.AbstractDataFrameSpec.TEST_CONFIG_FILE
import org.apache.ignite.spark.IgniteDataFrameSettings._
import org.apache.spark.sql.DataFrame
@@ -270,6 +271,44 @@ class IgniteSQLDataFrameSpec extends AbstractDataFrameSpec {
persons(0).getAs[Long]("city_id") should equal(2)
persons(0).getAs[Long]("count(1)") should equal(3)
}
+
+ it("should use the schema name where one is specified") {
+ // `employeeCache1` is created in the schema matching the name of the cache, ie. `employeeCache1`.
+ createEmployeeCache(client, "employeeCache1")
+
+ spark.read
+ .format(FORMAT_IGNITE)
+ .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
+ .option(OPTION_TABLE, "employee")
+ .option(OPTION_SCHEMA, "employeeCache1")
+ .load()
+ .createOrReplaceTempView("employeeWithSchema")
+
+ // `employeeCache2` is created with a custom schema of `employeeSchema`.
+ createEmployeeCache(client, "employeeCache2", Some("employeeSchema"))
+
+ Iterators.size(client.cache("employeeCache2").iterator()) should equal(3)
+
+ // Remove a value from `employeeCache2` so that we know whether the select statement picks up the
+ // correct cache, ie. it should now have 2 values compared to 3 in `employeeCache1`.
+ client.cache("employeeCache2").remove("key1") shouldBe true
+
+ spark.read
+ .format(FORMAT_IGNITE)
+ .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
+ .option(OPTION_TABLE, "employee")
+ .option(OPTION_SCHEMA, "employeeSchema")
+ .load()
+ .createOrReplaceTempView("employeeWithSchema2")
+
+ val res = spark.sqlContext.sql("SELECT id FROM employeeWithSchema").rdd
+
+ res.count should equal(3)
+
+ val res2 = spark.sqlContext.sql("SELECT id FROM employeeWithSchema2").rdd
+
+ res2.count should equal(2)
+ }
}
override protected def beforeAll(): Unit = {
http://git-wip-us.apache.org/repos/asf/ignite/blob/b50a9dc1/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteSQLDataFrameWriteSpec.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteSQLDataFrameWriteSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteSQLDataFrameWriteSpec.scala
index 30df27e..8d6f6fb 100644
--- a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteSQLDataFrameWriteSpec.scala
+++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteSQLDataFrameWriteSpec.scala
@@ -24,6 +24,7 @@ import org.apache.ignite.spark.impl.sqlTableInfo
import org.apache.ignite.testframework.GridTestUtils.resolveIgnitePath
import org.apache.spark.sql.SaveMode.{Append, Ignore, Overwrite}
import org.apache.spark.sql.{DataFrame, SaveMode}
+import org.junit.Assert.assertEquals
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.apache.spark.sql.functions._
@@ -220,7 +221,7 @@ class IgniteSQLDataFrameWriteSpec extends AbstractDataFrameSpec {
.save()
}
- val tblInfo = sqlTableInfo[Any, Any](client, PERSON_TBL_NAME)
+ val tblInfo = sqlTableInfo[Any, Any](client, PERSON_TBL_NAME, None)
assert(tblInfo.isDefined, s"Table $PERSON_TBL_NAME should exists.")
}
@@ -236,7 +237,7 @@ class IgniteSQLDataFrameWriteSpec extends AbstractDataFrameSpec {
.save()
}
- val tblInfo = sqlTableInfo[Any, Any](client, PERSON_TBL_NAME)
+ val tblInfo = sqlTableInfo[Any, Any](client, PERSON_TBL_NAME, None)
assert(tblInfo.isDefined, s"Table $PERSON_TBL_NAME should exists.")
}
@@ -252,7 +253,7 @@ class IgniteSQLDataFrameWriteSpec extends AbstractDataFrameSpec {
.save()
}
- val tblInfo = sqlTableInfo[Any, Any](client, PERSON_TBL_NAME)
+ val tblInfo = sqlTableInfo[Any, Any](client, PERSON_TBL_NAME, None)
assert(tblInfo.isDefined, s"Table $PERSON_TBL_NAME should exists.")
}
@@ -332,6 +333,22 @@ class IgniteSQLDataFrameWriteSpec extends AbstractDataFrameSpec {
.save()
}
}
+
+ it("Should throw exception if saving data frame as a new table with non-PUBLIC schema") {
+ val ex = intercept[IgniteException] {
+ personDataFrame.write
+ .format(FORMAT_IGNITE)
+ .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
+ .option(OPTION_TABLE, "nonexistant-table-name")
+ .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id")
+ .option(OPTION_SCHEMA, "mySchema")
+ .save()
+ }
+
+ assertEquals(ex.getMessage,
+ "Creating new tables in schema mySchema is not valid, tables must only be created in " +
+ org.apache.ignite.internal.processors.query.QueryUtils.DFLT_SCHEMA)
+ }
}
override protected def beforeAll(): Unit = {