You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2019/01/21 08:36:26 UTC

[ignite] branch master updated: IGNITE-10314: Spark Data Frame integration should use up to date schema if user executes add/drop column DDL (#5598)

This is an automated email from the ASF dual-hosted git repository.

nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 0151668  IGNITE-10314: Spark Data Frame integration  should use up to date schema if user executes add/drop column DDL (#5598)
0151668 is described below

commit 015166804eb2d7fd596e8458a0eaf0a3c3f49edd
Author: ldzhjn <12...@bjtu.edu.cn>
AuthorDate: Mon Jan 21 16:36:17 2019 +0800

    IGNITE-10314: Spark Data Frame integration  should use up to date schema if user executes add/drop column DDL (#5598)
    
    Spark Data Frame integration should use up to date schema if user executes add/drop column DDL
---
 .../ignite/spark/impl/IgniteRelationProvider.scala |  2 +-
 .../ignite/spark/impl/IgniteSQLRelation.scala      | 12 ++--
 .../org/apache/ignite/spark/impl/QueryHelper.scala |  9 +--
 .../org/apache/ignite/spark/impl/package.scala     | 57 +++++++++-------
 .../spark/sql/ignite/IgniteExternalCatalog.scala   | 16 ++---
 .../ignite/spark/IgniteDataFrameSchemaSpec.scala   | 79 +++++++++++++++++++++-
 .../ignite/spark/IgniteSQLDataFrameWriteSpec.scala |  6 +-
 7 files changed, 134 insertions(+), 47 deletions(-)

diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteRelationProvider.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteRelationProvider.scala
index 71195d5..039ca63 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteRelationProvider.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteRelationProvider.scala
@@ -103,7 +103,7 @@ class IgniteRelationProvider extends RelationProvider
 
         val tblName = tableName(params)
 
-        val tblInfoOption = sqlTableInfo[Any, Any](ctx.ignite(), tblName, params.get(OPTION_SCHEMA))
+        val tblInfoOption = sqlTableInfo(ctx.ignite(), tblName, params.get(OPTION_SCHEMA))
 
         if (tblInfoOption.isDefined) {
             mode match {
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLRelation.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLRelation.scala
index cabf311..c8d5122 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLRelation.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLRelation.scala
@@ -18,7 +18,7 @@
 package org.apache.ignite.spark.impl
 
 import org.apache.ignite.IgniteException
-import org.apache.ignite.cache.QueryEntity
+import org.apache.ignite.internal.processors.query.{GridQueryTypeDescriptor, QueryTypeDescriptorImpl}
 import org.apache.ignite.spark.{IgniteContext, IgniteRDD, impl}
 import org.apache.spark.Partition
 import org.apache.spark.internal.Logging
@@ -42,7 +42,7 @@ class IgniteSQLRelation[K, V](
       * @return Schema of Ignite SQL table.
       */
     override def schema: StructType =
-        igniteSQLTable(ic.ignite(), tableName, schemaName)
+        sqlTableInfo(ic.ignite(), tableName, schemaName)
             .map(IgniteSQLRelation.schema)
             .getOrElse(throw new IgniteException(s"Unknown table $tableName"))
 
@@ -113,15 +113,15 @@ object IgniteSQLRelation {
       * @param table Ignite table descirption.
       * @return Spark table descirption
       */
-    def schema(table: QueryEntity): StructType = {
+    def schema(table: GridQueryTypeDescriptor): StructType = {
         //Partition columns has to be in the end of list.
         //See `org.apache.spark.sql.catalyst.catalog.CatalogTable#partitionSchema`
-        val columns = table.getFields.toList.sortBy(c ⇒ isKeyColumn(table, c._1))
+        val columns = table.fields.toList.sortBy(c ⇒ isKeyColumn(table, c._1))
 
         StructType(columns.map { case (name, dataType) ⇒
             StructField(
-                name = table.getAliases.getOrDefault(name, name),
-                dataType = IgniteRDD.dataType(dataType, name),
+                name = table.asInstanceOf[QueryTypeDescriptorImpl].aliases.getOrDefault(name, name),
+                dataType = IgniteRDD.dataType(dataType.getName, name),
                 nullable = !isKeyColumn(table, name),
                 metadata = Metadata.empty)
         })
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/QueryHelper.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/QueryHelper.scala
index fc2a40d..f752b1a 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/QueryHelper.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/QueryHelper.scala
@@ -21,6 +21,7 @@ import org.apache.ignite.cache.query.SqlFieldsQuery
 import org.apache.ignite.spark.IgniteDataFrameSettings._
 import QueryUtils.{compileCreateTable, compileDropTable, compileInsert}
 import org.apache.ignite.internal.IgniteEx
+import org.apache.ignite.internal.processors.query.QueryTypeDescriptorImpl
 import org.apache.ignite.internal.processors.query.QueryUtils.DFLT_SCHEMA
 import org.apache.ignite.spark.IgniteContext
 import org.apache.ignite.{Ignite, IgniteException}
@@ -163,9 +164,9 @@ private[apache] object QueryHelper {
         streamerPerNodeBufferSize: Option[Int],
         streamerPerNodeParallelOperations: Option[Int]
     ): Unit = {
-        val tblInfo = sqlTableInfo[Any, Any](ctx.ignite(), tblName, schemaName).get
+        val tblInfo = sqlTableInfo(ctx.ignite(), tblName, schemaName).get.asInstanceOf[QueryTypeDescriptorImpl]
 
-        val streamer = ctx.ignite().dataStreamer(tblInfo._1.getName)
+        val streamer = ctx.ignite().dataStreamer(tblInfo.cacheName)
 
         streamerAllowOverwrite.foreach(v ⇒ streamer.allowOverwrite(v))
 
@@ -185,8 +186,8 @@ private[apache] object QueryHelper {
                     row.get(row.fieldIndex(f.name)).asInstanceOf[Object]
                 }
 
-                qryProcessor.streamUpdateQuery(tblInfo._1.getName,
-                    tblInfo._1.getSqlSchema, streamer, insertQry, args.toArray)
+                qryProcessor.streamUpdateQuery(tblInfo.cacheName,
+                    tblInfo.schemaName, streamer, insertQry, args.toArray)
             }
         }
         finally {
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/package.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/package.scala
index 36ec956..c41937a 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/package.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/package.scala
@@ -18,12 +18,14 @@
 package org.apache.ignite.spark
 
 import org.apache.commons.lang.StringUtils.equalsIgnoreCase
-import org.apache.ignite.{Ignite, Ignition}
-import org.apache.ignite.cache.{CacheMode, QueryEntity}
+import org.apache.ignite.cache.CacheMode
 import org.apache.ignite.cluster.ClusterNode
 import org.apache.ignite.configuration.CacheConfiguration
+import org.apache.ignite.internal.IgniteEx
+import org.apache.ignite.internal.processors.query.{GridQueryTypeDescriptor, QueryTypeDescriptorImpl}
 import org.apache.ignite.internal.processors.query.QueryUtils.normalizeSchemaName
 import org.apache.ignite.internal.util.lang.GridFunc.contains
+import org.apache.ignite.{Ignite, Ignition}
 import org.apache.spark.Partition
 import org.apache.spark.sql.catalyst.catalog.SessionCatalog
 
@@ -75,25 +77,16 @@ package object impl {
       * @param ignite Ignite instance.
       * @param tabName Table name.
       * @param schemaName Optional schema name.
-      * @return QueryEntity for a given table.
-      */
-    def igniteSQLTable(ignite: Ignite, tabName: String, schemaName: Option[String]): Option[QueryEntity] =
-        sqlTableInfo[Any, Any](ignite, tabName, schemaName).map(_._2)
-
-    /**
-      * @param ignite Ignite instance.
-      * @param tabName Table name.
-      * @param schemaName Optional schema name.
       * @return Cache name for given table.
       */
     def sqlCacheName(ignite: Ignite, tabName: String, schemaName: Option[String]): Option[String] =
-        sqlTableInfo[Any, Any](ignite, tabName, schemaName).map(_._1.getName)
+        sqlTableInfo(ignite, tabName, schemaName).map(_.asInstanceOf[QueryTypeDescriptorImpl].cacheName)
 
     /**
       * @param ignite Ignite instance.
       * @return All schemas in given Ignite instance.
       */
-    def allSchemas(ignite: Ignite): Seq[String] = ignite.cacheNames()
+    def allSchemas(ignite: Ignite): Seq[String] = ignite.cacheNames
         .map(name =>
             normalizeSchemaName(name,
                 ignite.cache[Any,Any](name).getConfiguration(classOf[CacheConfiguration[Any,Any]]).getSqlSchema))
@@ -117,24 +110,40 @@ package object impl {
       * @param ignite Ignite instance.
       * @param tabName Table name.
       * @param schemaName Optional schema name.
-      * @tparam K Key class.
-      * @tparam V Value class.
-      * @return CacheConfiguration and QueryEntity for a given table.
+      * @return GridQueryTypeDescriptor for a given table.
       */
-    def sqlTableInfo[K, V](ignite: Ignite, tabName: String,
-        schemaName: Option[String]): Option[(CacheConfiguration[K, V], QueryEntity)] =
-        cachesForSchema[K,V](ignite, schemaName)
-            .map(ccfg ⇒ ccfg.getQueryEntities.find(_.getTableName.equalsIgnoreCase(tabName)).map(qe ⇒ (ccfg, qe)))
-            .find(_.isDefined)
-            .flatten
+    def sqlTableInfo(ignite: Ignite, tabName: String, schemaName: Option[String]): Option[GridQueryTypeDescriptor] =
+        ignite.asInstanceOf[IgniteEx].context.cache.publicCacheNames
+            .flatMap(cacheName => ignite.asInstanceOf[IgniteEx].context.query.types(cacheName))
+            .find(table => table.tableName.equalsIgnoreCase(tabName) && isValidSchema(table, schemaName))
+
+    /**
+      * @param table GridQueryTypeDescriptor for a given table.
+      * @param schemaName Optional schema name.
+      * @return `True` if schema is valid.
+      */
+    def isValidSchema(table: GridQueryTypeDescriptor, schemaName: Option[String]): Boolean =
+        schemaName match {
+            case Some(schema) =>
+                schema.equalsIgnoreCase(table.schemaName) || schema.equals(SessionCatalog.DEFAULT_DATABASE)
+            case None =>
+                true
+        }
 
     /**
       * @param table Table.
       * @param column Column name.
       * @return `True` if column is key.
       */
-    def isKeyColumn(table: QueryEntity, column: String): Boolean =
-        contains(table.getKeyFields, column) || equalsIgnoreCase(table.getKeyFieldName, column)
+    def isKeyColumn(table: GridQueryTypeDescriptor, column: String): Boolean =
+        contains(allKeyFields(table), column) || equalsIgnoreCase(table.keyFieldName, column)
+
+    /**
+      * @param table Table.
+      * @return All the key fields in a Set.
+      */
+    def allKeyFields(table: GridQueryTypeDescriptor): scala.collection.Set[String] =
+        table.fields.filter(entry => table.property(entry._1).key).keySet
 
     /**
       * Computes spark partitions for a given cache.
diff --git a/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteExternalCatalog.scala b/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteExternalCatalog.scala
index 9e8290c..b575840 100644
--- a/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteExternalCatalog.scala
+++ b/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteExternalCatalog.scala
@@ -97,9 +97,9 @@ private[ignite] class IgniteExternalCatalog(igniteContext: IgniteContext)
 
         val schemaName = schemaOrDefault(db, currentSchema)
 
-        igniteSQLTable(ignite, tabName, Some(db)) match {
+        sqlTableInfo(ignite, tabName, Some(db)) match {
             case Some(table) ⇒
-                val tableName = table.getTableName
+                val tableName = table.tableName
 
                 Some(new CatalogTable(
                     identifier = new TableIdentifier(tableName, Some(schemaName)),
@@ -117,10 +117,10 @@ private[ignite] class IgniteExternalCatalog(igniteContext: IgniteContext)
                     schema = schema(table),
                     provider = Some(FORMAT_IGNITE),
                     partitionColumnNames =
-                        if (table.getKeyFields != null)
-                            table.getKeyFields.toSeq
+                        if (!allKeyFields(table).isEmpty)
+                            allKeyFields(table).toSeq
                         else
-                            Seq(table.getKeyFieldName),
+                            Seq(table.keyFieldName),
                     bucketSpec = None))
             case None ⇒ None
         }
@@ -259,7 +259,7 @@ private[ignite] class IgniteExternalCatalog(igniteContext: IgniteContext)
 
     /** @inheritdoc */
     override protected def doCreateTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = {
-        igniteSQLTable(ignite, tableDefinition.identifier.table, tableDefinition.identifier.database) match {
+        sqlTableInfo(ignite, tableDefinition.identifier.table, tableDefinition.identifier.database) match {
             case Some(_) ⇒
                 /* no-op */
 
@@ -281,9 +281,9 @@ private[ignite] class IgniteExternalCatalog(igniteContext: IgniteContext)
 
     /** @inheritdoc */
     override protected def doDropTable(db: String, tabName: String, ignoreIfNotExists: Boolean, purge: Boolean): Unit =
-        igniteSQLTable(ignite, tabName, Some(schemaOrDefault(db, currentSchema))) match {
+        sqlTableInfo(ignite, tabName, Some(schemaOrDefault(db, currentSchema))) match {
             case Some(table) ⇒
-                val tableName = table.getTableName
+                val tableName = table.tableName
 
                 QueryHelper.dropTable(tableName, ignite)
 
diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSchemaSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSchemaSpec.scala
index b071008..795f019 100644
--- a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSchemaSpec.scala
+++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSchemaSpec.scala
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.spark
 
+import org.apache.ignite.Ignite
+import org.apache.ignite.cache.query.SqlFieldsQuery
 import org.apache.ignite.cache.query.annotations.QuerySqlField
 import org.apache.ignite.configuration.CacheConfiguration
 import org.apache.ignite.spark.AbstractDataFrameSpec._
@@ -39,6 +41,12 @@ class IgniteDataFrameSchemaSpec extends AbstractDataFrameSpec {
 
     var personWithAliasesDataFrame: DataFrame = _
 
+    var columnMetaDataFrame: DataFrame = _
+
+    var addedColumnDataFrame: DataFrame = _
+
+    var droppedColumnDataFrame: DataFrame = _
+
     describe("Loading DataFrame schema for Ignite tables") {
         it("should successfully load DataFrame schema for a Ignite SQL Table") {
             personDataFrame.schema.fields.map(f ⇒ (f.name, f.dataType, f.nullable)) should equal (
@@ -55,6 +63,52 @@ class IgniteDataFrameSchemaSpec extends AbstractDataFrameSpec {
             )
         }
 
+        it("should show correct schema for a Ignite SQL Table with modified column") {
+            columnMetaDataFrame = spark.read
+                .format(FORMAT_IGNITE)
+                .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
+                .option(OPTION_TABLE, "test")
+                .load()
+
+            columnMetaDataFrame.schema.fields.map(f ⇒ (f.name, f.dataType, f.nullable)) should equal (
+                Array(
+                    ("A", IntegerType, true),
+                    ("B", StringType, true),
+                    ("ID", IntegerType, false))
+            )
+
+            addColumnForTable(client, DEFAULT_CACHE)
+
+            addedColumnDataFrame = spark.read
+                .format(FORMAT_IGNITE)
+                .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
+                .option(OPTION_TABLE, "test")
+                .load()
+
+            addedColumnDataFrame.schema.fields.map(f ⇒ (f.name, f.dataType, f.nullable)) should equal (
+                Array(
+                    ("A", IntegerType, true),
+                    ("B", StringType, true),
+                    ("C", IntegerType, true),
+                    ("ID", IntegerType, false))
+            )
+
+            dropColumnFromTable(client, DEFAULT_CACHE)
+
+            droppedColumnDataFrame = spark.read
+                .format(FORMAT_IGNITE)
+                .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
+                .option(OPTION_TABLE, "test")
+                .load()
+
+            droppedColumnDataFrame.schema.fields.map(f ⇒ (f.name, f.dataType, f.nullable)) should equal (
+                Array(
+                    ("B", StringType, true),
+                    ("C", IntegerType, true),
+                    ("ID", IntegerType, false))
+            )
+        }
+
         it("should successfully load DataFrame data for a Ignite table configured throw java annotation") {
             employeeDataFrame.schema.fields.map(f ⇒ (f.name, f.dataType, f.nullable)) should equal (
                 Array(
@@ -64,7 +118,7 @@ class IgniteDataFrameSchemaSpec extends AbstractDataFrameSpec {
             )
         }
 
-        it("should use QueryEntity column aliases") {
+        it("should use GridQueryTypeDescriptor column aliases") {
             personWithAliasesDataFrame.schema.fields.map(f ⇒ (f.name, f.dataType, f.nullable)) should equal (
                 Array(
                     ("ID", LongType, true),
@@ -88,6 +142,8 @@ class IgniteDataFrameSchemaSpec extends AbstractDataFrameSpec {
 
         createPersonTable(client, DEFAULT_CACHE)
 
+        createMetaTestTable(client, DEFAULT_CACHE)
+
         createEmployeeCache(client, EMPLOYEE_CACHE_NAME)
 
         personDataFrame = spark.read
@@ -107,6 +163,27 @@ class IgniteDataFrameSchemaSpec extends AbstractDataFrameSpec {
         employeeDataFrame.createOrReplaceTempView("employee")
     }
 
+    def createMetaTestTable(client: Ignite, cacheName: String): Unit = {
+        val cache = client.cache(cacheName)
+
+        cache.query(new SqlFieldsQuery(
+            "CREATE TABLE test (id INT PRIMARY KEY, a INT, b CHAR)")).getAll
+    }
+
+    def addColumnForTable(client: Ignite, cacheName: String): Unit = {
+        val cache = client.cache(cacheName)
+
+        cache.query(new SqlFieldsQuery(
+            "ALTER TABLE test ADD COLUMN c int")).getAll
+    }
+
+    def dropColumnFromTable(client: Ignite, cacheName: String): Unit = {
+        val cache = client.cache(cacheName)
+
+        cache.query(new SqlFieldsQuery(
+            "ALTER TABLE test DROP COLUMN a")).getAll
+    }
+
     case class JPersonWithAlias(
         @(QuerySqlField @field) id: Long,
         @(QuerySqlField @field)(name = "person_name", index = true) name: String)
diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteSQLDataFrameWriteSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteSQLDataFrameWriteSpec.scala
index 8d6f6fb..6480507 100644
--- a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteSQLDataFrameWriteSpec.scala
+++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteSQLDataFrameWriteSpec.scala
@@ -221,7 +221,7 @@ class IgniteSQLDataFrameWriteSpec extends AbstractDataFrameSpec {
                     .save()
             }
 
-            val tblInfo = sqlTableInfo[Any, Any](client, PERSON_TBL_NAME, None)
+            val tblInfo = sqlTableInfo(client, PERSON_TBL_NAME, None)
 
             assert(tblInfo.isDefined, s"Table $PERSON_TBL_NAME should exists.")
         }
@@ -237,7 +237,7 @@ class IgniteSQLDataFrameWriteSpec extends AbstractDataFrameSpec {
                     .save()
             }
 
-            val tblInfo = sqlTableInfo[Any, Any](client, PERSON_TBL_NAME, None)
+            val tblInfo = sqlTableInfo(client, PERSON_TBL_NAME, None)
 
             assert(tblInfo.isDefined, s"Table $PERSON_TBL_NAME should exists.")
         }
@@ -253,7 +253,7 @@ class IgniteSQLDataFrameWriteSpec extends AbstractDataFrameSpec {
                     .save()
             }
 
-            val tblInfo = sqlTableInfo[Any, Any](client, PERSON_TBL_NAME, None)
+            val tblInfo = sqlTableInfo(client, PERSON_TBL_NAME, None)
 
             assert(tblInfo.isDefined, s"Table $PERSON_TBL_NAME should exists.")
         }