You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2018/10/08 15:57:36 UTC

ignite git commit: IGNITE-9228: Spark SQL Table Schema Specification - Fixes #4551.

Repository: ignite
Updated Branches:
  refs/heads/master 5f055c7f5 -> b50a9dc18


IGNITE-9228: Spark SQL Table Schema Specification - Fixes #4551.

Signed-off-by: Nikolay Izhikov <ni...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b50a9dc1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b50a9dc1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b50a9dc1

Branch: refs/heads/master
Commit: b50a9dc18a115d5fc599ac15a11546911ae0bc45
Parents: 5f055c7
Author: stuartmacd <29...@users.noreply.github.com>
Authored: Mon Oct 8 18:21:04 2018 +0300
Committer: Nikolay Izhikov <ni...@apache.org>
Committed: Mon Oct 8 18:46:40 2018 +0300

----------------------------------------------------------------------
 .../ignite/spark/IgniteDataFrameSettings.scala  | 16 +++-
 .../spark/impl/IgniteRelationProvider.scala     | 17 +++-
 .../ignite/spark/impl/IgniteSQLRelation.scala   | 12 +--
 .../apache/ignite/spark/impl/QueryHelper.scala  | 13 ++-
 .../org/apache/ignite/spark/impl/package.scala  | 95 ++++++++++----------
 .../sql/ignite/IgniteExternalCatalog.scala      | 92 +++++++++----------
 .../spark/sql/ignite/IgniteOptimization.scala   |  3 +-
 .../ignite/spark/AbstractDataFrameSpec.scala    |  4 +-
 .../apache/ignite/spark/IgniteCatalogSpec.scala | 76 +++++++++++++++-
 .../ignite/spark/IgniteSQLDataFrameSpec.scala   | 39 ++++++++
 .../spark/IgniteSQLDataFrameWriteSpec.scala     | 23 ++++-
 11 files changed, 274 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b50a9dc1/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteDataFrameSettings.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteDataFrameSettings.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteDataFrameSettings.scala
index 9daaec4..e176721 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteDataFrameSettings.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteDataFrameSettings.scala
@@ -55,6 +55,20 @@ object IgniteDataFrameSettings {
     val OPTION_TABLE = "table"
 
     /**
+      * Config option to specify the Ignite SQL schema name in which the specified table is present.
+      * If this is not specified, all schemata will be scanned for a table name which matches the given table
+      * name and the first matching table will be used. This option can be used when there are multiple tables in
+      * different schemata with the same table name to disambiguate the tables.
+      *
+      * @example {{{
+      * val igniteDF = spark.read.format(IGNITE)
+      *     .option(OPTION_TABLE, "myTable")
+      *     .option(OPTION_SCHEMA, "mySchema")
+      * }}}
+      */
+    val OPTION_SCHEMA = "schema"
+
+    /**
       * Config option to specify newly created Ignite SQL table parameters.
       * Value of these option will be used in `CREATE TABLE ...  WITH "option value goes here"`
       *
@@ -123,7 +137,7 @@ object IgniteDataFrameSettings {
     /**
       * Config option for saving data frame.
       * Internally all SQL inserts are done through `IgniteDataStreamer`.
-      * This options sets perNodeBufferSize` property of streamer.
+      * This options sets `perNodeBufferSize` property of streamer.
       *
       * @example {{{
       * val igniteDF = spark.write.format(IGNITE)

http://git-wip-us.apache.org/repos/asf/ignite/blob/b50a9dc1/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteRelationProvider.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteRelationProvider.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteRelationProvider.scala
index e4fa9f7..71195d5 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteRelationProvider.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteRelationProvider.scala
@@ -55,12 +55,14 @@ class IgniteRelationProvider extends RelationProvider
       * @see IgniteRelation
       * @see IgnitionEx#grid(String)
       * @see org.apache.ignite.spark.IgniteDataFrameSettings.OPTION_TABLE
+      * @see org.apache.ignite.spark.IgniteDataFrameSettings.OPTION_SCHEMA
       * @see org.apache.ignite.spark.IgniteDataFrameSettings.OPTION_CONFIG_FILE
       */
     override def createRelation(sqlCtx: SQLContext, params: Map[String, String]): BaseRelation =
         createRelation(
             igniteContext(params, sqlCtx),
             params.getOrElse(OPTION_TABLE, throw new IgniteException("'table' must be specified.")),
+            params.get(OPTION_SCHEMA),
             sqlCtx)
 
     /**
@@ -101,7 +103,7 @@ class IgniteRelationProvider extends RelationProvider
 
         val tblName = tableName(params)
 
-        val tblInfoOption = sqlTableInfo[Any, Any](ctx.ignite(), tblName)
+        val tblInfoOption = sqlTableInfo[Any, Any](ctx.ignite(), tblName, params.get(OPTION_SCHEMA))
 
         if (tblInfoOption.isDefined) {
             mode match {
@@ -120,6 +122,7 @@ class IgniteRelationProvider extends RelationProvider
 
                     saveTable(data,
                         tblName,
+                        params.get(OPTION_SCHEMA),
                         ctx,
                         params.get(OPTION_STREAMER_ALLOW_OVERWRITE).map(_.toBoolean),
                         params.get(OPTION_STREAMER_FLUSH_FREQUENCY).map(_.toLong),
@@ -129,6 +132,7 @@ class IgniteRelationProvider extends RelationProvider
                 case Append ⇒
                     saveTable(data,
                         tblName,
+                        params.get(OPTION_SCHEMA),
                         ctx,
                         params.get(OPTION_STREAMER_ALLOW_OVERWRITE).map(_.toBoolean),
                         params.get(OPTION_STREAMER_FLUSH_FREQUENCY).map(_.toLong),
@@ -159,6 +163,7 @@ class IgniteRelationProvider extends RelationProvider
 
             saveTable(data,
                 tblName,
+                params.get(OPTION_SCHEMA),
                 ctx,
                 params.get(OPTION_STREAMER_ALLOW_OVERWRITE).map(_.toBoolean),
                 params.get(OPTION_STREAMER_FLUSH_FREQUENCY).map(_.toLong),
@@ -168,25 +173,28 @@ class IgniteRelationProvider extends RelationProvider
 
         createRelation(ctx,
             tblName,
+            params.get(OPTION_SCHEMA),
             sqlCtx)
     }
 
     /**
       * @param igniteCtx Ignite context.
       * @param tblName Table name.
+      * @param schema Optional schema name.
       * @param sqlCtx SQL context.
       * @return Ignite SQL relation.
       */
-    private def createRelation(igniteCtx: IgniteContext, tblName: String, sqlCtx: SQLContext): BaseRelation = {
+    private def createRelation(igniteCtx: IgniteContext, tblName: String, schema: Option[String], sqlCtx: SQLContext):
+    BaseRelation = {
         val optimizationDisabled =
             sqlCtx.sparkSession.conf.get(OPTION_DISABLE_SPARK_SQL_OPTIMIZATION, "false").toBoolean
 
         val experimentalMethods = sqlCtx.sparkSession.sessionState.experimentalMethods
 
         if (optimizationDisabled) {
-            experimentalMethods.extraOptimizations = 
+            experimentalMethods.extraOptimizations =
                 experimentalMethods.extraOptimizations.filter(_ != IgniteOptimization)
-        } 
+        }
         else {
             val optimizationExists = experimentalMethods.extraOptimizations.contains(IgniteOptimization)
 
@@ -197,6 +205,7 @@ class IgniteRelationProvider extends RelationProvider
         IgniteSQLRelation(
             igniteCtx,
             tblName,
+            schema,
             sqlCtx)
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b50a9dc1/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLRelation.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLRelation.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLRelation.scala
index 1b4f277..cabf311 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLRelation.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLRelation.scala
@@ -34,14 +34,15 @@ import scala.collection.JavaConversions._
   */
 class IgniteSQLRelation[K, V](
     private[apache] val ic: IgniteContext,
-    private[apache] val tableName: String)
+    private[apache] val tableName: String,
+    private[apache] val schemaName: Option[String])
     (@transient val sqlContext: SQLContext) extends BaseRelation with PrunedFilteredScan with Logging {
 
     /**
       * @return Schema of Ignite SQL table.
       */
     override def schema: StructType =
-        igniteSQLTable(ic.ignite(), tableName)
+        igniteSQLTable(ic.ignite(), tableName, schemaName)
             .map(IgniteSQLRelation.schema)
             .getOrElse(throw new IgniteException(s"Unknown table $tableName"))
 
@@ -101,7 +102,7 @@ class IgniteSQLRelation[K, V](
       * Cache name for a table name.
       */
     private lazy val cacheName: String =
-        sqlCacheName(ic.ignite(), tableName)
+        sqlCacheName(ic.ignite(), tableName, schemaName)
             .getOrElse(throw new IgniteException(s"Unknown table $tableName"))
 }
 
@@ -126,6 +127,7 @@ object IgniteSQLRelation {
         })
     }
 
-    def apply[K, V](ic: IgniteContext, tableName: String, sqlContext: SQLContext): IgniteSQLRelation[K, V] =
-        new IgniteSQLRelation[K, V](ic,tableName)(sqlContext)
+    def apply[K, V](ic: IgniteContext, tableName: String, schemaName: Option[String],
+        sqlContext: SQLContext): IgniteSQLRelation[K, V] =
+        new IgniteSQLRelation[K, V](ic, tableName, schemaName)(sqlContext)
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b50a9dc1/modules/spark/src/main/scala/org/apache/ignite/spark/impl/QueryHelper.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/QueryHelper.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/QueryHelper.scala
index 4342265..fc2a40d 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/QueryHelper.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/QueryHelper.scala
@@ -21,6 +21,7 @@ import org.apache.ignite.cache.query.SqlFieldsQuery
 import org.apache.ignite.spark.IgniteDataFrameSettings._
 import QueryUtils.{compileCreateTable, compileDropTable, compileInsert}
 import org.apache.ignite.internal.IgniteEx
+import org.apache.ignite.internal.processors.query.QueryUtils.DFLT_SCHEMA
 import org.apache.ignite.spark.IgniteContext
 import org.apache.ignite.{Ignite, IgniteException}
 import org.apache.spark.sql.types.StructType
@@ -72,6 +73,11 @@ private[apache] object QueryHelper {
         if (!params.contains(OPTION_TABLE) && !params.contains("path"))
             throw new IgniteException("'table' must be specified.")
 
+        if (params.contains(OPTION_SCHEMA) && !params(OPTION_SCHEMA).equalsIgnoreCase(DFLT_SCHEMA)) {
+            throw new IgniteException("Creating new tables in schema " + params(OPTION_SCHEMA) + " is not valid, tables"
+                + " must only be created in " + DFLT_SCHEMA)
+        }
+
         params.get(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS)
             .map(_.split(','))
             .getOrElse(throw new IgniteException("Can't create table! Primary key fields has to be specified."))
@@ -91,6 +97,7 @@ private[apache] object QueryHelper {
       *
       * @param data Data.
       * @param tblName Table name.
+      * @param schemaName Optional schema name.
       * @param ctx Ignite context.
       * @param streamerAllowOverwrite Flag enabling overwriting existing values in cache.
       * @param streamerFlushFrequency Insert query streamer automatic flush frequency.
@@ -105,6 +112,7 @@ private[apache] object QueryHelper {
       */
     def saveTable(data: DataFrame,
         tblName: String,
+        schemaName: Option[String],
         ctx: IgniteContext,
         streamerAllowOverwrite: Option[Boolean],
         streamerFlushFrequency: Option[Long],
@@ -117,6 +125,7 @@ private[apache] object QueryHelper {
             savePartition(iterator,
                 insertQry,
                 tblName,
+                schemaName,
                 ctx,
                 streamerAllowOverwrite,
                 streamerFlushFrequency,
@@ -131,6 +140,7 @@ private[apache] object QueryHelper {
       * @param iterator Data iterator.
       * @param insertQry Insert query.
       * @param tblName Table name.
+      * @param schemaName Optional schema name.
       * @param ctx Ignite context.
       * @param streamerAllowOverwrite Flag enabling overwriting existing values in cache.
       * @param streamerFlushFrequency Insert query streamer automatic flush frequency.
@@ -146,13 +156,14 @@ private[apache] object QueryHelper {
     private def savePartition(iterator: Iterator[Row],
         insertQry: String,
         tblName: String,
+        schemaName: Option[String],
         ctx: IgniteContext,
         streamerAllowOverwrite: Option[Boolean],
         streamerFlushFrequency: Option[Long],
         streamerPerNodeBufferSize: Option[Int],
         streamerPerNodeParallelOperations: Option[Int]
     ): Unit = {
-        val tblInfo = sqlTableInfo[Any, Any](ctx.ignite(), tblName).get
+        val tblInfo = sqlTableInfo[Any, Any](ctx.ignite(), tblName, schemaName).get
 
         val streamer = ctx.ignite().dataStreamer(tblInfo._1.getName)
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b50a9dc1/modules/spark/src/main/scala/org/apache/ignite/spark/impl/package.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/package.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/package.scala
index be7b366..36ec956 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/package.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/package.scala
@@ -18,10 +18,11 @@
 package org.apache.ignite.spark
 
 import org.apache.commons.lang.StringUtils.equalsIgnoreCase
-import org.apache.ignite.{Ignite, IgniteException, IgniteState, Ignition}
+import org.apache.ignite.{Ignite, Ignition}
 import org.apache.ignite.cache.{CacheMode, QueryEntity}
 import org.apache.ignite.cluster.ClusterNode
 import org.apache.ignite.configuration.CacheConfiguration
+import org.apache.ignite.internal.processors.query.QueryUtils.normalizeSchemaName
 import org.apache.ignite.internal.util.lang.GridFunc.contains
 import org.apache.spark.Partition
 import org.apache.spark.sql.catalyst.catalog.SessionCatalog
@@ -31,26 +32,6 @@ import scala.collection.mutable.ArrayBuffer
 
 package object impl {
     /**
-      * Checks named instance of Ignite exists.
-      * Throws IgniteException if not.
-      *
-      * @param gridName Name of grid.
-      */
-    def ensureIgnite(gridName: String): Unit =
-        if (!igniteExists(gridName))
-            throw new IgniteException(s"Ignite grid with name '$gridName' does not exist.")
-
-    /**
-      * @param gridName Name of grid.
-      * @return True if named instance of Ignite exists false otherwise.
-      */
-    def igniteExists(gridName: String): Boolean =
-        if (gridName == "")
-            Ignition.state() == IgniteState.STARTED
-        else
-            Ignition.state(gridName) == IgniteState.STARTED
-
-    /**
       * @param g Ignite.
       * @return Name of Ignite. If name is null empty string returned.
       */
@@ -61,19 +42,15 @@ package object impl {
             ""
 
     /**
-      * @param name Name of grid..
-      * @param default Default instance.
-      * @return Named grid instance if it exists. If not default instance returned.
+      * @param schema Name of schema.
+      * @param default Default schema.
+      * @return Schema to use.
       */
-    def igniteOrDefault(name: String, default: Ignite): Ignite =
-        if (name == SessionCatalog.DEFAULT_DATABASE) {
-            if (igniteExists(name))
-                ignite(name)
-            else
-                default
-        }
+    def schemaOrDefault(schema: String, default: String): String =
+        if (schema == SessionCatalog.DEFAULT_DATABASE)
+            default
         else
-            ignite(name)
+            schema
 
     /**
       * @param gridName Name of grid.
@@ -88,42 +65,68 @@ package object impl {
     /**
       * @param ignite Ignite instance.
       * @param tabName Table name.
+      * @param schemaName Optional schema name.
       * @return True if table exists false otherwise.
       */
-    def sqlTableExists(ignite: Ignite, tabName: String): Boolean =
-        sqlTableInfo(ignite, tabName).isDefined
+    def sqlTableExists(ignite: Ignite, tabName: String, schemaName: Option[String]): Boolean =
+        sqlTableInfo(ignite, tabName, schemaName).isDefined
 
     /**
       * @param ignite Ignite instance.
       * @param tabName Table name.
+      * @param schemaName Optional schema name.
       * @return QueryEntity for a given table.
       */
-    def igniteSQLTable(ignite: Ignite, tabName: String): Option[QueryEntity] =
-        sqlTableInfo[Any, Any](ignite, tabName).map(_._2)
+    def igniteSQLTable(ignite: Ignite, tabName: String, schemaName: Option[String]): Option[QueryEntity] =
+        sqlTableInfo[Any, Any](ignite, tabName, schemaName).map(_._2)
 
     /**
       * @param ignite Ignite instance.
       * @param tabName Table name.
+      * @param schemaName Optional schema name.
       * @return Cache name for given table.
       */
-    def sqlCacheName(ignite: Ignite, tabName: String): Option[String] =
-        sqlTableInfo[Any, Any](ignite, tabName).map(_._1.getName)
+    def sqlCacheName(ignite: Ignite, tabName: String, schemaName: Option[String]): Option[String] =
+        sqlTableInfo[Any, Any](ignite, tabName, schemaName).map(_._1.getName)
+
+    /**
+      * @param ignite Ignite instance.
+      * @return All schemas in given Ignite instance.
+      */
+    def allSchemas(ignite: Ignite): Seq[String] = ignite.cacheNames()
+        .map(name =>
+            normalizeSchemaName(name,
+                ignite.cache[Any,Any](name).getConfiguration(classOf[CacheConfiguration[Any,Any]]).getSqlSchema))
+        .toSeq
+        .distinct
+
+    /**
+      * @param ignite Ignite instance.
+      * @param schemaName Schema name.
+      * @return All cache configurations for the given schema.
+      */
+    def cachesForSchema[K,V](ignite: Ignite, schemaName: Option[String]): Seq[CacheConfiguration[K,V]] =
+        ignite.cacheNames
+            .map(ignite.cache[K,V](_).getConfiguration(classOf[CacheConfiguration[K,V]]))
+            .filter(ccfg =>
+                schemaName.forall(normalizeSchemaName(ccfg.getName, ccfg.getSqlSchema).equalsIgnoreCase(_)) ||
+                schemaName.contains(SessionCatalog.DEFAULT_DATABASE))
+            .toSeq
 
     /**
       * @param ignite Ignite instance.
       * @param tabName Table name.
+      * @param schemaName Optional schema name.
       * @tparam K Key class.
       * @tparam V Value class.
       * @return CacheConfiguration and QueryEntity for a given table.
       */
-    def sqlTableInfo[K, V](ignite: Ignite, tabName: String): Option[(CacheConfiguration[K, V], QueryEntity)] =
-        ignite.cacheNames().map { cacheName ⇒
-            val ccfg = ignite.cache[K, V](cacheName).getConfiguration(classOf[CacheConfiguration[K, V]])
-
-            val queryEntities = ccfg.getQueryEntities
-
-            queryEntities.find(_.getTableName.equalsIgnoreCase(tabName)).map(qe ⇒ (ccfg, qe))
-        }.find(_.isDefined).flatten
+    def sqlTableInfo[K, V](ignite: Ignite, tabName: String,
+        schemaName: Option[String]): Option[(CacheConfiguration[K, V], QueryEntity)] =
+        cachesForSchema[K,V](ignite, schemaName)
+            .map(ccfg ⇒ ccfg.getQueryEntities.find(_.getTableName.equalsIgnoreCase(tabName)).map(qe ⇒ (ccfg, qe)))
+            .find(_.isDefined)
+            .flatten
 
     /**
       * @param table Table.

http://git-wip-us.apache.org/repos/asf/ignite/blob/b50a9dc1/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteExternalCatalog.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteExternalCatalog.scala b/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteExternalCatalog.scala
index f632d2a..9e8290c 100644
--- a/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteExternalCatalog.scala
+++ b/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteExternalCatalog.scala
@@ -19,12 +19,12 @@ package org.apache.spark.sql.ignite
 
 import java.net.URI
 
-import org.apache.ignite.configuration.CacheConfiguration
+import org.apache.ignite.internal.processors.query.QueryUtils.DFLT_SCHEMA
 import org.apache.ignite.spark.IgniteDataFrameSettings.OPTION_TABLE
 import org.apache.ignite.spark.IgniteContext
 import org.apache.ignite.spark.IgniteDataFrameSettings._
 import org.apache.ignite.spark.impl.IgniteSQLRelation.schema
-import org.apache.ignite.{Ignite, IgniteException, Ignition}
+import org.apache.ignite.{Ignite, IgniteException}
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
@@ -41,14 +41,16 @@ import scala.collection.JavaConversions._
 /**
   * External catalog implementation to provide transparent access to SQL tables existed in Ignite.
   *
-  * @param defaultIgniteContext Ignite context to provide access to Ignite instance. If <code>None</code> passed then no-name instance of Ignite used.
+  * @param igniteContext Ignite context to provide access to Ignite instance.
   */
-private[ignite] class IgniteExternalCatalog(defaultIgniteContext: IgniteContext)
+private[ignite] class IgniteExternalCatalog(igniteContext: IgniteContext)
     extends ExternalCatalog {
     /**
       * Default Ignite instance.
       */
-    @transient private var default: Ignite = defaultIgniteContext.ignite
+    @transient private val ignite: Ignite = igniteContext.ignite()
+
+    @transient private var currentSchema = DEFAULT_DATABASE
 
     /**
       * @param db Ignite instance name.
@@ -58,56 +60,52 @@ private[ignite] class IgniteExternalCatalog(defaultIgniteContext: IgniteContext)
         CatalogDatabase(db, db, IGNITE_URI, Map.empty)
 
     /**
-      * Checks Ignite instance with provided name exists.
-      * If <code>db == SessionCatalog.DEFAULT_DATABASE</code> checks for a default Ignite instance.
+      * Checks Ignite schema with provided name exists.
       *
-      * @param db Ignite instance name or <code>SessionCatalog.DEFAULT_DATABASE</code>.
-      * @return True is Ignite instance exists.
+      * @param schema Ignite schema name or <code>SessionCatalog.DEFAULT_DATABASE</code>.
+      * @return True is Ignite schema exists.
       */
-    override def databaseExists(db: String): Boolean =
-        db == DEFAULT_DATABASE || igniteExists(db)
+    override def databaseExists(schema: String): Boolean =
+        schema == DEFAULT_DATABASE || allSchemas(ignite).exists(schema.equalsIgnoreCase)
 
     /**
-      * @return List of all known Ignite instances names.
+      * @return List of all known Ignite schemas.
       */
     override def listDatabases(): Seq[String] =
-        Ignition.allGrids().map(igniteName)
+        allSchemas(ignite)
 
     /**
       * @param pattern Pattern to filter databases names.
-      * @return List of all known Ignite instances names filtered by pattern.
+      * @return List of all known Ignite schema names filtered by pattern.
       */
     override def listDatabases(pattern: String): Seq[String] =
         StringUtils.filterPattern(listDatabases(), pattern)
 
     /**
-      * Sets default Ignite instance.
+      * Sets default Ignite schema.
       *
-      * @param db Name of Ignite instance.
+      * @param schema Name of Ignite schema.
       */
-    override def setCurrentDatabase(db: String): Unit = {
-        ensureIgnite(db)
-
-        default = ignite(db)
-    }
+    override def setCurrentDatabase(schema: String): Unit =
+        currentSchema = schema
 
     /** @inheritdoc */
     override def getTable(db: String, table: String): CatalogTable = getTableOption(db, table).get
 
-	def getTableOption(db: String, tabName: String): Option[CatalogTable] = {
-        val ignite = igniteOrDefault(db, default)
-
+    def getTableOption(db: String, tabName: String): Option[CatalogTable] = {
         val gridName = igniteName(ignite)
 
-        igniteSQLTable(ignite, tabName) match {
+        val schemaName = schemaOrDefault(db, currentSchema)
+
+        igniteSQLTable(ignite, tabName, Some(db)) match {
             case Some(table) ⇒
                 val tableName = table.getTableName
 
                 Some(new CatalogTable(
-                    identifier = new TableIdentifier(tableName, Some(gridName)),
+                    identifier = new TableIdentifier(tableName, Some(schemaName)),
                     tableType = CatalogTableType.EXTERNAL,
                     storage = CatalogStorageFormat(
-                        locationUri = Some(URI.create(IGNITE_PROTOCOL + tableName)),
+                        locationUri = Some(URI.create(IGNITE_PROTOCOL + schemaName + "/" + tableName)),
                         inputFormat = Some(FORMAT_IGNITE),
                         outputFormat = Some(FORMAT_IGNITE),
                         serde = None,
@@ -130,23 +128,16 @@ private[ignite] class IgniteExternalCatalog(defaultIgniteContext: IgniteContext)
 
     /** @inheritdoc */
     override def tableExists(db: String, table: String): Boolean =
-        sqlTableExists(igniteOrDefault(db, default), table)
+        sqlTableExists(ignite, table, Some(schemaOrDefault(db, currentSchema)))
 
     /** @inheritdoc */
     override def listTables(db: String): Seq[String] = listTables(db, ".*")
 
     /** @inheritdoc */
-    override def listTables(db: String, pattern: String): Seq[String] = {
-        val ignite = igniteOrDefault(db, default)
-
-        ignite.cacheNames.flatten { name =>
-            val cache = ignite.cache[Any, Any](name)
-
-            val ccfg = cache.getConfiguration(classOf[CacheConfiguration[Any, Any]])
-
-            ccfg.getQueryEntities.map(_.getTableName)
-        }.toSeq
-    }
+    override def listTables(db: String, pattern: String): Seq[String] =
+        StringUtils.filterPattern(
+            cachesForSchema[Any,Any](ignite, Some(schemaOrDefault(db, currentSchema)))
+                .flatMap(_.getQueryEntities.map(_.getTableName)), pattern)
 
     /** @inheritdoc */
     override def loadTable(db: String, table: String,
@@ -161,9 +152,7 @@ private[ignite] class IgniteExternalCatalog(defaultIgniteContext: IgniteContext)
 
     /** @inheritdoc */
     override def listPartitionNames(db: String, table: String, partialSpec: Option[TablePartitionSpec]): Seq[String] = {
-        val ignite = igniteOrDefault(db, default)
-
-        sqlCacheName(ignite, table).map { cacheName ⇒
+        sqlCacheName(ignite, table, Some(schemaOrDefault(db, currentSchema))).map { cacheName ⇒
             val parts = ignite.affinity(cacheName).partitions()
 
             (0 until parts).map(_.toString)
@@ -173,14 +162,13 @@ private[ignite] class IgniteExternalCatalog(defaultIgniteContext: IgniteContext)
     /** @inheritdoc */
     override def listPartitions(db: String, table: String,
         partialSpec: Option[TablePartitionSpec]): Seq[CatalogTablePartition] = {
-        val ignite = igniteOrDefault(db, default)
 
         val partitionNames = listPartitionNames(db, table, partialSpec)
 
         if (partitionNames.isEmpty)
             Seq.empty
         else {
-            val cacheName = sqlCacheName(ignite, table).get
+            val cacheName = sqlCacheName(ignite, table, Some(schemaOrDefault(db, currentSchema))).get
 
             val aff = ignite.affinity[Any](cacheName)
 
@@ -271,13 +259,16 @@ private[ignite] class IgniteExternalCatalog(defaultIgniteContext: IgniteContext)
 
     /** @inheritdoc */
     override protected def doCreateTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = {
-        val ignite = igniteOrDefault(tableDefinition.identifier.database.getOrElse(DEFAULT_DATABASE), default)
-
-        igniteSQLTable(ignite, tableDefinition.identifier.table) match {
+        igniteSQLTable(ignite, tableDefinition.identifier.table, tableDefinition.identifier.database) match {
             case Some(_) ⇒
                 /* no-op */
 
             case None ⇒
+                val schema = tableDefinition.identifier.database
+
+                if(schema.isDefined && !schema.contains(DFLT_SCHEMA) && !schema.contains(DEFAULT_DATABASE))
+                    throw new IgniteException("Can only create new tables in PUBLIC schema, not " + schema.get)
+
                 val props = tableDefinition.storage.properties
 
                 QueryHelper.createTable(tableDefinition.schema,
@@ -289,10 +280,8 @@ private[ignite] class IgniteExternalCatalog(defaultIgniteContext: IgniteContext)
     }
 
     /** @inheritdoc */
-    override protected def doDropTable(db: String, tabName: String, ignoreIfNotExists: Boolean, purge: Boolean): Unit = {
-        val ignite = igniteOrDefault(db, default)
-
-        igniteSQLTable(ignite, tabName) match {
+    override protected def doDropTable(db: String, tabName: String, ignoreIfNotExists: Boolean, purge: Boolean): Unit =
+        igniteSQLTable(ignite, tabName, Some(schemaOrDefault(db, currentSchema))) match {
             case Some(table) ⇒
                 val tableName = table.getTableName
 
@@ -302,7 +291,6 @@ private[ignite] class IgniteExternalCatalog(defaultIgniteContext: IgniteContext)
                 if (!ignoreIfNotExists)
                     throw new IgniteException(s"Table $tabName doesn't exists.")
         }
-    }
 
     /** @inheritdoc */
     override protected def doRenameTable(db: String, oldName: String, newName: String): Unit =

http://git-wip-us.apache.org/repos/asf/ignite/blob/b50a9dc1/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteOptimization.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteOptimization.scala b/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteOptimization.scala
index 2d97792..0455e86 100644
--- a/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteOptimization.scala
+++ b/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteOptimization.scala
@@ -73,7 +73,8 @@ object IgniteOptimization extends Rule[LogicalPlan] with Logging {
                     catalogTable = _catalogTable,
                     aliasIndex = aliasIndexIterator,
                     cacheName =
-                        sqlCacheName(igniteSqlRelation.ic.ignite(), igniteSqlRelation.tableName)
+                        sqlCacheName(igniteSqlRelation.ic.ignite(), igniteSqlRelation.tableName,
+                            igniteSqlRelation.schemaName)
                             .getOrElse(throw new IgniteException("Unknown table")))
 
                 //Logical Relation is bottomest TreeNode in LogicalPlan.

http://git-wip-us.apache.org/repos/asf/ignite/blob/b50a9dc1/modules/spark/src/test/scala/org/apache/ignite/spark/AbstractDataFrameSpec.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/AbstractDataFrameSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/AbstractDataFrameSpec.scala
index a5ea25d..25d4a5c 100644
--- a/modules/spark/src/test/scala/org/apache/ignite/spark/AbstractDataFrameSpec.scala
+++ b/modules/spark/src/test/scala/org/apache/ignite/spark/AbstractDataFrameSpec.scala
@@ -117,9 +117,11 @@ abstract class AbstractDataFrameSpec extends FunSpec with Matchers with BeforeAn
         cache.query(qry.setArgs(4L.asInstanceOf[JLong], "St. Petersburg")).getAll
     }
 
-    def createEmployeeCache(client: Ignite, cacheName: String): Unit = {
+    def createEmployeeCache(client: Ignite, cacheName: String, schemaName: Option[String] = None): Unit = {
         val ccfg = AbstractDataFrameSpec.cacheConfiguration[String, Employee](cacheName)
 
+        schemaName.foreach(ccfg.setSqlSchema)
+
         val cache = client.getOrCreateCache(ccfg)
 
         cache.put("key1", Employee(1, "John Connor", 15, 0))

http://git-wip-us.apache.org/repos/asf/ignite/blob/b50a9dc1/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteCatalogSpec.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteCatalogSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteCatalogSpec.scala
index d87d234..88e655d 100644
--- a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteCatalogSpec.scala
+++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteCatalogSpec.scala
@@ -19,12 +19,15 @@ package org.apache.ignite.spark
 
 import java.lang.{Long ⇒ JLong}
 
+import org.apache.ignite.IgniteException
 import org.apache.ignite.cache.query.SqlFieldsQuery
 import org.apache.ignite.internal.IgnitionEx
 import org.apache.ignite.internal.util.IgniteUtils.resolveIgnitePath
 import org.apache.ignite.spark.AbstractDataFrameSpec.{DEFAULT_CACHE, EMPLOYEE_CACHE_NAME, TEST_CONFIG_FILE, enclose}
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog
 import org.apache.spark.sql.ignite.IgniteSparkSession
 import org.apache.spark.sql.types.{LongType, StringType}
+import org.junit.Assert.assertEquals
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
 
@@ -39,9 +42,35 @@ class IgniteCatalogSpec extends AbstractDataFrameSpec {
         it("Should observe all available SQL tables") {
             val tables = igniteSession.catalog.listTables.collect()
 
-            tables.length should equal (3)
+            tables.length should equal(3)
 
-            tables.map(_.name).sorted should equal (Array("CITY", "EMPLOYEE", "PERSON"))
+            tables.map(_.name).sorted should equal(Array("CITY", "EMPLOYEE", "PERSON"))
+        }
+
+        it("Should use the database context when providing tables") {
+            igniteSession.catalog.setCurrentDatabase("employeeSchema")
+
+            val employeeSchemaTables = igniteSession.catalog.listTables().collect()
+
+            employeeSchemaTables.map(_.name).sorted should equal(Array("EMPLOYEE"))
+
+            igniteSession.catalog.setCurrentDatabase("PUBLIC")
+
+            val publicSchemaTables = igniteSession.catalog.listTables().collect()
+
+            publicSchemaTables.map(_.name).sorted should equal(Array("CITY", "PERSON"))
+        }
+
+        it("Should provide table names given the PUBLIC schema") {
+            val tables = igniteSession.catalog.listTables("PUBLIC").collect()
+
+            tables.map(_.name).sorted should equal(Array("CITY", "PERSON"))
+        }
+
+        it("Should provide table names given a custom schema") {
+            val tables = igniteSession.catalog.listTables("employeeSchema").collect()
+
+            tables.map(_.name).sorted should equal(Array("EMPLOYEE"))
         }
 
         it("Should provide correct schema for SQL table") {
@@ -55,6 +84,12 @@ class IgniteCatalogSpec extends AbstractDataFrameSpec {
                     ("NAME", StringType.catalogString, true)))
         }
 
+        it("Should provide the list of all schemas") {
+            val schemas = igniteSession.catalog.listDatabases().collect()
+
+            schemas.map(_.name).sorted should equal(Array("cache3", "employeeschema", "public"))
+        }
+
         it("Should provide ability to query SQL table without explicit registration") {
             val res = igniteSession.sql("SELECT id, name FROM city").rdd
 
@@ -126,6 +161,41 @@ class IgniteCatalogSpec extends AbstractDataFrameSpec {
                 )
             )
         }
+
+        it("Should allow schema specification in the table name for public schema") {
+            val res = igniteSession.sql("SELECT id, name FROM public.city").rdd
+
+            res.count should equal(4)
+        }
+
+        it("Should allow schema specification in the table name for non-public schema") {
+            val res = igniteSession.sql("SELECT id, name, salary FROM cache3.employee").rdd
+
+            res.count should equal(3)
+        }
+
+        it("Should allow Spark SQL to create a table") {
+            igniteSession.sql(
+                "CREATE TABLE NEW_SPARK_TABLE(id LONG, name STRING) USING JSON OPTIONS ('primaryKeyFields' = 'id')")
+
+            val tables = igniteSession.catalog.listTables.collect()
+
+            tables.find(_.name == "NEW_SPARK_TABLE").map(_.name) should equal (Some("NEW_SPARK_TABLE"))
+        }
+
+        it("Should disallow creation of tables in non-PUBLIC schemas") {
+            val ex = intercept[IgniteException] {
+                igniteSession.sql(
+                    "CREATE TABLE cache3.NEW_SPARK_TABLE(id LONG, name STRING) " +
+                        "USING JSON OPTIONS ('primaryKeyFields' = 'id')")
+            }
+
+            assertEquals(ex.getMessage, "Can only create new tables in PUBLIC schema, not cache3")
+        }
+    }
+
+    before {
+        igniteSession.catalog.setCurrentDatabase(SessionCatalog.DEFAULT_DATABASE)
     }
 
     override protected def beforeAll(): Unit = {
@@ -137,6 +207,8 @@ class IgniteCatalogSpec extends AbstractDataFrameSpec {
 
         createEmployeeCache(client, EMPLOYEE_CACHE_NAME)
 
+        createEmployeeCache(client, "myEmployeeCache", Some("employeeSchema"))
+
         val configProvider = enclose(null) (_ ⇒ () ⇒ {
             val cfg = IgnitionEx.loadConfiguration(TEST_CONFIG_FILE).get1()
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b50a9dc1/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteSQLDataFrameSpec.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteSQLDataFrameSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteSQLDataFrameSpec.scala
index 58c63b6..9593c8e 100644
--- a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteSQLDataFrameSpec.scala
+++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteSQLDataFrameSpec.scala
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.spark
 
+import com.google.common.collect.Iterators
 import org.apache.ignite.spark.AbstractDataFrameSpec.TEST_CONFIG_FILE
 import org.apache.ignite.spark.IgniteDataFrameSettings._
 import org.apache.spark.sql.DataFrame
@@ -270,6 +271,44 @@ class IgniteSQLDataFrameSpec extends AbstractDataFrameSpec {
             persons(0).getAs[Long]("city_id") should equal(2)
             persons(0).getAs[Long]("count(1)") should equal(3)
         }
+
+        it("should use the schema name where one is specified") {
+            // `employeeCache1` is created in the schema matching the name of the cache, ie. `employeeCache1`.
+            createEmployeeCache(client, "employeeCache1")
+
+            spark.read
+                .format(FORMAT_IGNITE)
+                .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
+                .option(OPTION_TABLE, "employee")
+                .option(OPTION_SCHEMA, "employeeCache1")
+                .load()
+                .createOrReplaceTempView("employeeWithSchema")
+
+            // `employeeCache2` is created with a custom schema of `employeeSchema`.
+            createEmployeeCache(client, "employeeCache2", Some("employeeSchema"))
+
+            Iterators.size(client.cache("employeeCache2").iterator()) should equal(3)
+
+            // Remove a value from `employeeCache2` so that we know whether the select statement picks up the
+            // correct cache, ie. it should now have 2 values compared to 3 in `employeeCache1`.
+            client.cache("employeeCache2").remove("key1") shouldBe true
+
+            spark.read
+                .format(FORMAT_IGNITE)
+                .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
+                .option(OPTION_TABLE, "employee")
+                .option(OPTION_SCHEMA, "employeeSchema")
+                .load()
+                .createOrReplaceTempView("employeeWithSchema2")
+
+            val res = spark.sqlContext.sql("SELECT id FROM employeeWithSchema").rdd
+
+            res.count should equal(3)
+
+            val res2 = spark.sqlContext.sql("SELECT id FROM employeeWithSchema2").rdd
+
+            res2.count should equal(2)
+        }
     }
 
     override protected def beforeAll(): Unit = {

http://git-wip-us.apache.org/repos/asf/ignite/blob/b50a9dc1/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteSQLDataFrameWriteSpec.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteSQLDataFrameWriteSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteSQLDataFrameWriteSpec.scala
index 30df27e..8d6f6fb 100644
--- a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteSQLDataFrameWriteSpec.scala
+++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteSQLDataFrameWriteSpec.scala
@@ -24,6 +24,7 @@ import org.apache.ignite.spark.impl.sqlTableInfo
 import org.apache.ignite.testframework.GridTestUtils.resolveIgnitePath
 import org.apache.spark.sql.SaveMode.{Append, Ignore, Overwrite}
 import org.apache.spark.sql.{DataFrame, SaveMode}
+import org.junit.Assert.assertEquals
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
 import org.apache.spark.sql.functions._
@@ -220,7 +221,7 @@ class IgniteSQLDataFrameWriteSpec extends AbstractDataFrameSpec {
                     .save()
             }
 
-            val tblInfo = sqlTableInfo[Any, Any](client, PERSON_TBL_NAME)
+            val tblInfo = sqlTableInfo[Any, Any](client, PERSON_TBL_NAME, None)
 
             assert(tblInfo.isDefined, s"Table $PERSON_TBL_NAME should exists.")
         }
@@ -236,7 +237,7 @@ class IgniteSQLDataFrameWriteSpec extends AbstractDataFrameSpec {
                     .save()
             }
 
-            val tblInfo = sqlTableInfo[Any, Any](client, PERSON_TBL_NAME)
+            val tblInfo = sqlTableInfo[Any, Any](client, PERSON_TBL_NAME, None)
 
             assert(tblInfo.isDefined, s"Table $PERSON_TBL_NAME should exists.")
         }
@@ -252,7 +253,7 @@ class IgniteSQLDataFrameWriteSpec extends AbstractDataFrameSpec {
                     .save()
             }
 
-            val tblInfo = sqlTableInfo[Any, Any](client, PERSON_TBL_NAME)
+            val tblInfo = sqlTableInfo[Any, Any](client, PERSON_TBL_NAME, None)
 
             assert(tblInfo.isDefined, s"Table $PERSON_TBL_NAME should exists.")
         }
@@ -332,6 +333,22 @@ class IgniteSQLDataFrameWriteSpec extends AbstractDataFrameSpec {
                     .save()
             }
         }
+
+        it("Should throw exception if saving data frame as a new table with non-PUBLIC schema") {
+            val ex = intercept[IgniteException] {
+                personDataFrame.write
+                    .format(FORMAT_IGNITE)
+                    .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
+                    .option(OPTION_TABLE, "nonexistant-table-name")
+                    .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id")
+                    .option(OPTION_SCHEMA, "mySchema")
+                    .save()
+            }
+
+            assertEquals(ex.getMessage,
+                "Creating new tables in schema mySchema is not valid, tables must only be created in " +
+                org.apache.ignite.internal.processors.query.QueryUtils.DFLT_SCHEMA)
+        }
     }
 
     override protected def beforeAll(): Unit = {