You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2019/01/21 08:36:26 UTC
[ignite] branch master updated: IGNITE-10314: Spark Data Frame
integration should use up to date schema if user executes add/drop column
DDL (#5598)
This is an automated email from the ASF dual-hosted git repository.
nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 0151668 IGNITE-10314: Spark Data Frame integration should use up to date schema if user executes add/drop column DDL (#5598)
0151668 is described below
commit 015166804eb2d7fd596e8458a0eaf0a3c3f49edd
Author: ldzhjn <12...@bjtu.edu.cn>
AuthorDate: Mon Jan 21 16:36:17 2019 +0800
IGNITE-10314: Spark Data Frame integration should use up to date schema if user executes add/drop column DDL (#5598)
Spark Data Frame integration should use up to date schema if user executes add/drop column DDL
---
.../ignite/spark/impl/IgniteRelationProvider.scala | 2 +-
.../ignite/spark/impl/IgniteSQLRelation.scala | 12 ++--
.../org/apache/ignite/spark/impl/QueryHelper.scala | 9 +--
.../org/apache/ignite/spark/impl/package.scala | 57 +++++++++-------
.../spark/sql/ignite/IgniteExternalCatalog.scala | 16 ++---
.../ignite/spark/IgniteDataFrameSchemaSpec.scala | 79 +++++++++++++++++++++-
.../ignite/spark/IgniteSQLDataFrameWriteSpec.scala | 6 +-
7 files changed, 134 insertions(+), 47 deletions(-)
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteRelationProvider.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteRelationProvider.scala
index 71195d5..039ca63 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteRelationProvider.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteRelationProvider.scala
@@ -103,7 +103,7 @@ class IgniteRelationProvider extends RelationProvider
val tblName = tableName(params)
- val tblInfoOption = sqlTableInfo[Any, Any](ctx.ignite(), tblName, params.get(OPTION_SCHEMA))
+ val tblInfoOption = sqlTableInfo(ctx.ignite(), tblName, params.get(OPTION_SCHEMA))
if (tblInfoOption.isDefined) {
mode match {
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLRelation.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLRelation.scala
index cabf311..c8d5122 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLRelation.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLRelation.scala
@@ -18,7 +18,7 @@
package org.apache.ignite.spark.impl
import org.apache.ignite.IgniteException
-import org.apache.ignite.cache.QueryEntity
+import org.apache.ignite.internal.processors.query.{GridQueryTypeDescriptor, QueryTypeDescriptorImpl}
import org.apache.ignite.spark.{IgniteContext, IgniteRDD, impl}
import org.apache.spark.Partition
import org.apache.spark.internal.Logging
@@ -42,7 +42,7 @@ class IgniteSQLRelation[K, V](
* @return Schema of Ignite SQL table.
*/
override def schema: StructType =
- igniteSQLTable(ic.ignite(), tableName, schemaName)
+ sqlTableInfo(ic.ignite(), tableName, schemaName)
.map(IgniteSQLRelation.schema)
.getOrElse(throw new IgniteException(s"Unknown table $tableName"))
@@ -113,15 +113,15 @@ object IgniteSQLRelation {
* @param table Ignite table descirption.
* @return Spark table descirption
*/
- def schema(table: QueryEntity): StructType = {
+ def schema(table: GridQueryTypeDescriptor): StructType = {
//Partition columns has to be in the end of list.
//See `org.apache.spark.sql.catalyst.catalog.CatalogTable#partitionSchema`
- val columns = table.getFields.toList.sortBy(c ⇒ isKeyColumn(table, c._1))
+ val columns = table.fields.toList.sortBy(c ⇒ isKeyColumn(table, c._1))
StructType(columns.map { case (name, dataType) ⇒
StructField(
- name = table.getAliases.getOrDefault(name, name),
- dataType = IgniteRDD.dataType(dataType, name),
+ name = table.asInstanceOf[QueryTypeDescriptorImpl].aliases.getOrDefault(name, name),
+ dataType = IgniteRDD.dataType(dataType.getName, name),
nullable = !isKeyColumn(table, name),
metadata = Metadata.empty)
})
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/QueryHelper.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/QueryHelper.scala
index fc2a40d..f752b1a 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/QueryHelper.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/QueryHelper.scala
@@ -21,6 +21,7 @@ import org.apache.ignite.cache.query.SqlFieldsQuery
import org.apache.ignite.spark.IgniteDataFrameSettings._
import QueryUtils.{compileCreateTable, compileDropTable, compileInsert}
import org.apache.ignite.internal.IgniteEx
+import org.apache.ignite.internal.processors.query.QueryTypeDescriptorImpl
import org.apache.ignite.internal.processors.query.QueryUtils.DFLT_SCHEMA
import org.apache.ignite.spark.IgniteContext
import org.apache.ignite.{Ignite, IgniteException}
@@ -163,9 +164,9 @@ private[apache] object QueryHelper {
streamerPerNodeBufferSize: Option[Int],
streamerPerNodeParallelOperations: Option[Int]
): Unit = {
- val tblInfo = sqlTableInfo[Any, Any](ctx.ignite(), tblName, schemaName).get
+ val tblInfo = sqlTableInfo(ctx.ignite(), tblName, schemaName).get.asInstanceOf[QueryTypeDescriptorImpl]
- val streamer = ctx.ignite().dataStreamer(tblInfo._1.getName)
+ val streamer = ctx.ignite().dataStreamer(tblInfo.cacheName)
streamerAllowOverwrite.foreach(v ⇒ streamer.allowOverwrite(v))
@@ -185,8 +186,8 @@ private[apache] object QueryHelper {
row.get(row.fieldIndex(f.name)).asInstanceOf[Object]
}
- qryProcessor.streamUpdateQuery(tblInfo._1.getName,
- tblInfo._1.getSqlSchema, streamer, insertQry, args.toArray)
+ qryProcessor.streamUpdateQuery(tblInfo.cacheName,
+ tblInfo.schemaName, streamer, insertQry, args.toArray)
}
}
finally {
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/package.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/package.scala
index 36ec956..c41937a 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/package.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/package.scala
@@ -18,12 +18,14 @@
package org.apache.ignite.spark
import org.apache.commons.lang.StringUtils.equalsIgnoreCase
-import org.apache.ignite.{Ignite, Ignition}
-import org.apache.ignite.cache.{CacheMode, QueryEntity}
+import org.apache.ignite.cache.CacheMode
import org.apache.ignite.cluster.ClusterNode
import org.apache.ignite.configuration.CacheConfiguration
+import org.apache.ignite.internal.IgniteEx
+import org.apache.ignite.internal.processors.query.{GridQueryTypeDescriptor, QueryTypeDescriptorImpl}
import org.apache.ignite.internal.processors.query.QueryUtils.normalizeSchemaName
import org.apache.ignite.internal.util.lang.GridFunc.contains
+import org.apache.ignite.{Ignite, Ignition}
import org.apache.spark.Partition
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
@@ -75,25 +77,16 @@ package object impl {
* @param ignite Ignite instance.
* @param tabName Table name.
* @param schemaName Optional schema name.
- * @return QueryEntity for a given table.
- */
- def igniteSQLTable(ignite: Ignite, tabName: String, schemaName: Option[String]): Option[QueryEntity] =
- sqlTableInfo[Any, Any](ignite, tabName, schemaName).map(_._2)
-
- /**
- * @param ignite Ignite instance.
- * @param tabName Table name.
- * @param schemaName Optional schema name.
* @return Cache name for given table.
*/
def sqlCacheName(ignite: Ignite, tabName: String, schemaName: Option[String]): Option[String] =
- sqlTableInfo[Any, Any](ignite, tabName, schemaName).map(_._1.getName)
+ sqlTableInfo(ignite, tabName, schemaName).map(_.asInstanceOf[QueryTypeDescriptorImpl].cacheName)
/**
* @param ignite Ignite instance.
* @return All schemas in given Ignite instance.
*/
- def allSchemas(ignite: Ignite): Seq[String] = ignite.cacheNames()
+ def allSchemas(ignite: Ignite): Seq[String] = ignite.cacheNames
.map(name =>
normalizeSchemaName(name,
ignite.cache[Any,Any](name).getConfiguration(classOf[CacheConfiguration[Any,Any]]).getSqlSchema))
@@ -117,24 +110,40 @@ package object impl {
* @param ignite Ignite instance.
* @param tabName Table name.
* @param schemaName Optional schema name.
- * @tparam K Key class.
- * @tparam V Value class.
- * @return CacheConfiguration and QueryEntity for a given table.
+ * @return GridQueryTypeDescriptor for a given table.
*/
- def sqlTableInfo[K, V](ignite: Ignite, tabName: String,
- schemaName: Option[String]): Option[(CacheConfiguration[K, V], QueryEntity)] =
- cachesForSchema[K,V](ignite, schemaName)
- .map(ccfg ⇒ ccfg.getQueryEntities.find(_.getTableName.equalsIgnoreCase(tabName)).map(qe ⇒ (ccfg, qe)))
- .find(_.isDefined)
- .flatten
+ def sqlTableInfo(ignite: Ignite, tabName: String, schemaName: Option[String]): Option[GridQueryTypeDescriptor] =
+ ignite.asInstanceOf[IgniteEx].context.cache.publicCacheNames
+ .flatMap(cacheName => ignite.asInstanceOf[IgniteEx].context.query.types(cacheName))
+ .find(table => table.tableName.equalsIgnoreCase(tabName) && isValidSchema(table, schemaName))
+
+ /**
+ * @param table GridQueryTypeDescriptor for a given table.
+ * @param schemaName Optional schema name.
+ * @return `True` if schema is valid.
+ */
+ def isValidSchema(table: GridQueryTypeDescriptor, schemaName: Option[String]): Boolean =
+ schemaName match {
+ case Some(schema) =>
+ schema.equalsIgnoreCase(table.schemaName) || schema.equals(SessionCatalog.DEFAULT_DATABASE)
+ case None =>
+ true
+ }
/**
* @param table Table.
* @param column Column name.
* @return `True` if column is key.
*/
- def isKeyColumn(table: QueryEntity, column: String): Boolean =
- contains(table.getKeyFields, column) || equalsIgnoreCase(table.getKeyFieldName, column)
+ def isKeyColumn(table: GridQueryTypeDescriptor, column: String): Boolean =
+ contains(allKeyFields(table), column) || equalsIgnoreCase(table.keyFieldName, column)
+
+ /**
+ * @param table Table.
+ * @return All the key fields in a Set.
+ */
+ def allKeyFields(table: GridQueryTypeDescriptor): scala.collection.Set[String] =
+ table.fields.filter(entry => table.property(entry._1).key).keySet
/**
* Computes spark partitions for a given cache.
diff --git a/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteExternalCatalog.scala b/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteExternalCatalog.scala
index 9e8290c..b575840 100644
--- a/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteExternalCatalog.scala
+++ b/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteExternalCatalog.scala
@@ -97,9 +97,9 @@ private[ignite] class IgniteExternalCatalog(igniteContext: IgniteContext)
val schemaName = schemaOrDefault(db, currentSchema)
- igniteSQLTable(ignite, tabName, Some(db)) match {
+ sqlTableInfo(ignite, tabName, Some(db)) match {
case Some(table) ⇒
- val tableName = table.getTableName
+ val tableName = table.tableName
Some(new CatalogTable(
identifier = new TableIdentifier(tableName, Some(schemaName)),
@@ -117,10 +117,10 @@ private[ignite] class IgniteExternalCatalog(igniteContext: IgniteContext)
schema = schema(table),
provider = Some(FORMAT_IGNITE),
partitionColumnNames =
- if (table.getKeyFields != null)
- table.getKeyFields.toSeq
+ if (!allKeyFields(table).isEmpty)
+ allKeyFields(table).toSeq
else
- Seq(table.getKeyFieldName),
+ Seq(table.keyFieldName),
bucketSpec = None))
case None ⇒ None
}
@@ -259,7 +259,7 @@ private[ignite] class IgniteExternalCatalog(igniteContext: IgniteContext)
/** @inheritdoc */
override protected def doCreateTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = {
- igniteSQLTable(ignite, tableDefinition.identifier.table, tableDefinition.identifier.database) match {
+ sqlTableInfo(ignite, tableDefinition.identifier.table, tableDefinition.identifier.database) match {
case Some(_) ⇒
/* no-op */
@@ -281,9 +281,9 @@ private[ignite] class IgniteExternalCatalog(igniteContext: IgniteContext)
/** @inheritdoc */
override protected def doDropTable(db: String, tabName: String, ignoreIfNotExists: Boolean, purge: Boolean): Unit =
- igniteSQLTable(ignite, tabName, Some(schemaOrDefault(db, currentSchema))) match {
+ sqlTableInfo(ignite, tabName, Some(schemaOrDefault(db, currentSchema))) match {
case Some(table) ⇒
- val tableName = table.getTableName
+ val tableName = table.tableName
QueryHelper.dropTable(tableName, ignite)
diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSchemaSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSchemaSpec.scala
index b071008..795f019 100644
--- a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSchemaSpec.scala
+++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSchemaSpec.scala
@@ -17,6 +17,8 @@
package org.apache.ignite.spark
+import org.apache.ignite.Ignite
+import org.apache.ignite.cache.query.SqlFieldsQuery
import org.apache.ignite.cache.query.annotations.QuerySqlField
import org.apache.ignite.configuration.CacheConfiguration
import org.apache.ignite.spark.AbstractDataFrameSpec._
@@ -39,6 +41,12 @@ class IgniteDataFrameSchemaSpec extends AbstractDataFrameSpec {
var personWithAliasesDataFrame: DataFrame = _
+ var columnMetaDataFrame: DataFrame = _
+
+ var addedColumnDataFrame: DataFrame = _
+
+ var droppedColumnDataFrame: DataFrame = _
+
describe("Loading DataFrame schema for Ignite tables") {
it("should successfully load DataFrame schema for a Ignite SQL Table") {
personDataFrame.schema.fields.map(f ⇒ (f.name, f.dataType, f.nullable)) should equal (
@@ -55,6 +63,52 @@ class IgniteDataFrameSchemaSpec extends AbstractDataFrameSpec {
)
}
+ it("should show correct schema for a Ignite SQL Table with modified column") {
+ columnMetaDataFrame = spark.read
+ .format(FORMAT_IGNITE)
+ .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
+ .option(OPTION_TABLE, "test")
+ .load()
+
+ columnMetaDataFrame.schema.fields.map(f ⇒ (f.name, f.dataType, f.nullable)) should equal (
+ Array(
+ ("A", IntegerType, true),
+ ("B", StringType, true),
+ ("ID", IntegerType, false))
+ )
+
+ addColumnForTable(client, DEFAULT_CACHE)
+
+ addedColumnDataFrame = spark.read
+ .format(FORMAT_IGNITE)
+ .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
+ .option(OPTION_TABLE, "test")
+ .load()
+
+ addedColumnDataFrame.schema.fields.map(f ⇒ (f.name, f.dataType, f.nullable)) should equal (
+ Array(
+ ("A", IntegerType, true),
+ ("B", StringType, true),
+ ("C", IntegerType, true),
+ ("ID", IntegerType, false))
+ )
+
+ dropColumnFromTable(client, DEFAULT_CACHE)
+
+ droppedColumnDataFrame = spark.read
+ .format(FORMAT_IGNITE)
+ .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
+ .option(OPTION_TABLE, "test")
+ .load()
+
+ droppedColumnDataFrame.schema.fields.map(f ⇒ (f.name, f.dataType, f.nullable)) should equal (
+ Array(
+ ("B", StringType, true),
+ ("C", IntegerType, true),
+ ("ID", IntegerType, false))
+ )
+ }
+
it("should successfully load DataFrame data for a Ignite table configured throw java annotation") {
employeeDataFrame.schema.fields.map(f ⇒ (f.name, f.dataType, f.nullable)) should equal (
Array(
@@ -64,7 +118,7 @@ class IgniteDataFrameSchemaSpec extends AbstractDataFrameSpec {
)
}
- it("should use QueryEntity column aliases") {
+ it("should use GridQueryTypeDescriptor column aliases") {
personWithAliasesDataFrame.schema.fields.map(f ⇒ (f.name, f.dataType, f.nullable)) should equal (
Array(
("ID", LongType, true),
@@ -88,6 +142,8 @@ class IgniteDataFrameSchemaSpec extends AbstractDataFrameSpec {
createPersonTable(client, DEFAULT_CACHE)
+ createMetaTestTable(client, DEFAULT_CACHE)
+
createEmployeeCache(client, EMPLOYEE_CACHE_NAME)
personDataFrame = spark.read
@@ -107,6 +163,27 @@ class IgniteDataFrameSchemaSpec extends AbstractDataFrameSpec {
employeeDataFrame.createOrReplaceTempView("employee")
}
+ def createMetaTestTable(client: Ignite, cacheName: String): Unit = {
+ val cache = client.cache(cacheName)
+
+ cache.query(new SqlFieldsQuery(
+ "CREATE TABLE test (id INT PRIMARY KEY, a INT, b CHAR)")).getAll
+ }
+
+ def addColumnForTable(client: Ignite, cacheName: String): Unit = {
+ val cache = client.cache(cacheName)
+
+ cache.query(new SqlFieldsQuery(
+ "ALTER TABLE test ADD COLUMN c int")).getAll
+ }
+
+ def dropColumnFromTable(client: Ignite, cacheName: String): Unit = {
+ val cache = client.cache(cacheName)
+
+ cache.query(new SqlFieldsQuery(
+ "ALTER TABLE test DROP COLUMN a")).getAll
+ }
+
case class JPersonWithAlias(
@(QuerySqlField @field) id: Long,
@(QuerySqlField @field)(name = "person_name", index = true) name: String)
diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteSQLDataFrameWriteSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteSQLDataFrameWriteSpec.scala
index 8d6f6fb..6480507 100644
--- a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteSQLDataFrameWriteSpec.scala
+++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteSQLDataFrameWriteSpec.scala
@@ -221,7 +221,7 @@ class IgniteSQLDataFrameWriteSpec extends AbstractDataFrameSpec {
.save()
}
- val tblInfo = sqlTableInfo[Any, Any](client, PERSON_TBL_NAME, None)
+ val tblInfo = sqlTableInfo(client, PERSON_TBL_NAME, None)
assert(tblInfo.isDefined, s"Table $PERSON_TBL_NAME should exists.")
}
@@ -237,7 +237,7 @@ class IgniteSQLDataFrameWriteSpec extends AbstractDataFrameSpec {
.save()
}
- val tblInfo = sqlTableInfo[Any, Any](client, PERSON_TBL_NAME, None)
+ val tblInfo = sqlTableInfo(client, PERSON_TBL_NAME, None)
assert(tblInfo.isDefined, s"Table $PERSON_TBL_NAME should exists.")
}
@@ -253,7 +253,7 @@ class IgniteSQLDataFrameWriteSpec extends AbstractDataFrameSpec {
.save()
}
- val tblInfo = sqlTableInfo[Any, Any](client, PERSON_TBL_NAME, None)
+ val tblInfo = sqlTableInfo(client, PERSON_TBL_NAME, None)
assert(tblInfo.isDefined, s"Table $PERSON_TBL_NAME should exists.")
}