You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2022/10/04 15:57:10 UTC
[ignite-extensions] 01/02: IGNITE-12519 Add support for lowercase object names. Fixes #178
This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git
commit 91e74f28a3627404e2802afa0da24a5e364e611f
Author: Ivan Gagarkin <ga...@gmail.com>
AuthorDate: Tue Oct 4 18:54:14 2022 +0300
IGNITE-12519 Add support for lowercase object names. Fixes #178
Signed-off-by: Slava Koptilin <sl...@gmail.com>
---
.../org/apache/ignite/spark/impl/QueryUtils.scala | 12 ++
.../impl/optimization/AggregateExpressions.scala | 2 +-
.../impl/optimization/ConditionExpressions.scala | 2 +-
.../spark/impl/optimization/DateExpressions.scala | 2 +-
.../spark/impl/optimization/MathExpressions.scala | 2 +-
.../impl/optimization/SimpleExpressions.scala | 19 +--
.../impl/optimization/StringExpressions.scala | 2 +-
.../impl/optimization/SupportedExpressions.scala | 2 +-
.../impl/optimization/SystemExpressions.scala | 2 +-
.../accumulator/JoinSQLAccumulator.scala | 53 ++++----
.../accumulator/SingleTableSQLAccumulator.scala | 37 ++++--
.../accumulator/UnionSQLAccumulator.scala | 14 ++-
.../impl/optimization/accumulator/package.scala | 31 +++++
.../ignite/spark/impl/optimization/package.scala | 10 +-
.../ignite/spark/IgniteQueryCompilatorSpec.scala | 139 +++++++++++++++++++++
15 files changed, 273 insertions(+), 56 deletions(-)
diff --git a/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/QueryUtils.scala b/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/QueryUtils.scala
index 79aa523..8437b2f 100644
--- a/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/QueryUtils.scala
+++ b/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/QueryUtils.scala
@@ -26,6 +26,18 @@ import org.apache.spark.sql.types._
* Utility class for building SQL queries.
*/
private[impl] object QueryUtils extends Logging {
+ /** Add quotes to provided string if needed.
+ * @param str String to be quoted.
+ * @param needed Boolean flag that indicates that the given string need to be quoted.
+ * @return result string.
+ */
+ def quoteStringIfNeeded(str: String, needed: Boolean): String = {
+ if (needed)
+ "\"" + str + "\""
+ else
+ str
+ }
+
/**
* Builds `where` part of SQL query.
*
diff --git a/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/AggregateExpressions.scala b/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/AggregateExpressions.scala
index 3e6b6b5..51f2fee 100644
--- a/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/AggregateExpressions.scala
+++ b/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/AggregateExpressions.scala
@@ -51,7 +51,7 @@ private[optimization] object AggregateExpressions extends SupportedExpressions {
/** @inheritdoc */
override def toString(expr: Expression, childToString: Expression ⇒ String, useQualifier: Boolean,
- useAlias: Boolean): Option[String] = expr match {
+ useAlias: Boolean, caseSensitive:Boolean): Option[String] = expr match {
case AggregateExpression(aggregateFunction, _, isDistinct, _, _) ⇒
aggregateFunction match {
case Count(children) ⇒
diff --git a/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/ConditionExpressions.scala b/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/ConditionExpressions.scala
index fbfbd64..6db2409 100644
--- a/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/ConditionExpressions.scala
+++ b/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/ConditionExpressions.scala
@@ -78,7 +78,7 @@ private[optimization] object ConditionExpressions extends SupportedExpressions {
/** @inheritdoc */
override def toString(expr: Expression, childToString: Expression ⇒ String, useQualifier: Boolean,
- useAlias: Boolean): Option[String] = expr match {
+ useAlias: Boolean, caseSensitive: Boolean): Option[String] = expr match {
case EqualTo(left, right) ⇒
Some(s"${childToString(left)} = ${childToString(right)}")
diff --git a/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/DateExpressions.scala b/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/DateExpressions.scala
index d075bf0..156d4fa 100644
--- a/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/DateExpressions.scala
+++ b/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/DateExpressions.scala
@@ -73,7 +73,7 @@ private[optimization] object DateExpressions extends SupportedExpressions {
/** @inheritdoc */
override def toString(expr: Expression, childToString: Expression ⇒ String, useQualifier: Boolean,
- useAlias: Boolean): Option[String] = expr match {
+ useAlias: Boolean, caseSensitive: Boolean): Option[String] = expr match {
case CurrentDate(_) ⇒
Some(s"CURRENT_DATE()")
diff --git a/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/MathExpressions.scala b/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/MathExpressions.scala
index 256cd78..99386ac 100644
--- a/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/MathExpressions.scala
+++ b/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/MathExpressions.scala
@@ -138,7 +138,7 @@ private[optimization] object MathExpressions extends SupportedExpressions {
/** @inheritdoc */
override def toString(expr: Expression, childToString: Expression ⇒ String, useQualifier: Boolean,
- useAlias: Boolean): Option[String] = expr match {
+ useAlias: Boolean, caseSensitive: Boolean): Option[String] = expr match {
case Abs(child, _) ⇒
Some(s"ABS(${childToString(child)})")
diff --git a/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/SimpleExpressions.scala b/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/SimpleExpressions.scala
index 37cb9e1..c32ecf5 100644
--- a/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/SimpleExpressions.scala
+++ b/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/SimpleExpressions.scala
@@ -17,13 +17,14 @@
package org.apache.ignite.spark.impl.optimization
+import org.apache.ignite.spark.impl.QueryUtils.quoteStringIfNeeded
+
import java.text.SimpleDateFormat
import org.apache.spark.sql.catalyst.expressions.{Expression, _}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._
import java.time.ZoneOffset
-import java.util.TimeZone
/**
* Object to support some 'simple' expressions like aliases.
@@ -49,7 +50,7 @@ private[optimization] object SimpleExpressions extends SupportedExpressions {
/** @inheritdoc */
override def toString(expr: Expression, childToString: Expression ⇒ String, useQualifier: Boolean,
- useAlias: Boolean): Option[String] = expr match {
+ useAlias: Boolean, caseSensitive: Boolean): Option[String] = expr match {
case l: Literal ⇒
if (l.value == null)
Some("null")
@@ -75,8 +76,8 @@ private[optimization] object SimpleExpressions extends SupportedExpressions {
//Internal representation of DateType is Int.
//So we converting from internal spark representation to CAST call.
case days: Integer ⇒
- val date = new java.util.Date(DateTimeUtils.microsToMillis(DateTimeUtils.daysToMicros(days, ZoneOffset
- .UTC))) // FIXME: default id
+ val date = new java.util.Date(DateTimeUtils.microsToMillis(
+ DateTimeUtils.daysToMicros(days, ZoneOffset.UTC))) // FIXME: default id
Some(s"CAST('${dateFormat.get.format(date)}' AS DATE)")
@@ -91,11 +92,13 @@ private[optimization] object SimpleExpressions extends SupportedExpressions {
case ar: AttributeReference ⇒
val name =
if (useQualifier)
- // TODO: add ticket to handle seq with two elements with qualifier for database name: related to the [SPARK-19602][SQL] ticket
- ar.qualifier.map(_ + "." + ar.name).find(_ => true).getOrElse(ar.name)
+ // TODO: add ticket to handle seq with two elements with qualifier for database name: related to the [SPARK-19602][SQL] ticket
+ ar.qualifier
+ .map(quoteStringIfNeeded(_, caseSensitive))
+ .map(_ + "." + quoteStringIfNeeded(ar.name, caseSensitive))
+ .find(_ => true).getOrElse(ar.name)
else
- ar.name
-
+ quoteStringIfNeeded(ar.name, caseSensitive)
if (ar.metadata.contains(ALIAS) &&
!isAliasEqualColumnName(ar.metadata.getString(ALIAS), ar.name) &&
useAlias) {
diff --git a/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/StringExpressions.scala b/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/StringExpressions.scala
index 733fe80..af10282 100644
--- a/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/StringExpressions.scala
+++ b/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/StringExpressions.scala
@@ -96,7 +96,7 @@ private[optimization] object StringExpressions extends SupportedExpressions {
/** @inheritdoc */
override def toString(expr: Expression, childToString: Expression ⇒ String, useQualifier: Boolean,
- useAlias: Boolean): Option[String] = expr match {
+ useAlias: Boolean, caseSensitive: Boolean): Option[String] = expr match {
case Ascii(child) ⇒
Some(s"ASCII(${childToString(child)})")
diff --git a/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/SupportedExpressions.scala b/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/SupportedExpressions.scala
index f46eb72..3926889 100644
--- a/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/SupportedExpressions.scala
+++ b/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/SupportedExpressions.scala
@@ -38,5 +38,5 @@ private[optimization] trait SupportedExpressions {
* @return SQL representation of `expr` if it supported. `None` otherwise.
*/
def toString(expr: Expression, childToString: (Expression) ⇒ String, useQualifier: Boolean,
- useAlias: Boolean): Option[String]
+ useAlias: Boolean, caseSensitive: Boolean): Option[String]
}
diff --git a/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/SystemExpressions.scala b/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/SystemExpressions.scala
index 40e4e29..66cfc71 100644
--- a/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/SystemExpressions.scala
+++ b/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/SystemExpressions.scala
@@ -71,7 +71,7 @@ private[optimization] object SystemExpressions extends SupportedExpressions {
/** @inheritdoc */
override def toString(expr: Expression, childToString: Expression ⇒ String, useQualifier: Boolean,
- useAlias: Boolean): Option[String] = expr match {
+ useAlias: Boolean, caseSensitive: Boolean): Option[String] = expr match {
case Coalesce(children) ⇒
Some(s"COALESCE(${children.map(childToString(_)).mkString(", ")})")
diff --git a/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/JoinSQLAccumulator.scala b/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/JoinSQLAccumulator.scala
index 05e5aeb..dd7804e 100644
--- a/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/JoinSQLAccumulator.scala
+++ b/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/JoinSQLAccumulator.scala
@@ -18,14 +18,15 @@
package org.apache.ignite.spark.impl.optimization.accumulator
import org.apache.ignite.IgniteException
+import org.apache.ignite.spark.impl.QueryUtils.quoteStringIfNeeded
import org.apache.ignite.spark.impl.optimization._
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, NamedExpression, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical.{BinaryNode, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.{Inner, JoinType, LeftOuter, RightOuter}
/**
- * Accumulator to store information about join query.
- */
+ * Accumulator to store information about join query.
+ */
private[apache] case class JoinSQLAccumulator(
igniteQueryContext: IgniteQueryContext,
left: QueryAccumulator,
@@ -47,29 +48,35 @@ private[apache] case class JoinSQLAccumulator(
override def compileQuery(prettyPrint: Boolean = false, nestedQuery: Boolean = false): String = {
val delim = if (prettyPrint) "\n" else " "
val tab = if (prettyPrint) " " else ""
+ val caseSensitiveEnabled = isCaseSensitiveEnabled(igniteQueryContext)
var sql = s"SELECT$delim$tab" +
- s"${fixQualifier(outputExpressions).map(exprToString(_, useQualifier = true)).mkString(", ")}$delim" +
+ s"${fixQualifier(outputExpressions)
+ .map(exprToString(_, useQualifier = true, caseSensitive = caseSensitiveEnabled)).mkString(", ")}$delim" +
s"FROM$delim$tab$compileJoinExpr"
if (allFilters.nonEmpty)
sql += s"${delim}WHERE$delim$tab" +
- s"${fixQualifier(allFilters).map(exprToString(_, useQualifier = true)).mkString(s" AND$delim$tab")}"
+ s"${fixQualifier(allFilters)
+ .map(exprToString(_, useQualifier = true, caseSensitive = caseSensitiveEnabled)).mkString(s" AND$delim$tab")}"
if (groupBy.exists(_.nonEmpty))
sql += s"${delim}GROUP BY " +
- s"${fixQualifier(groupBy.get).map(exprToString(_, useQualifier = true)).mkString(s",$delim$tab")}"
+ s"${fixQualifier(groupBy.get)
+ .map(exprToString(_, useQualifier = true, caseSensitive = caseSensitiveEnabled)).mkString(s",$delim$tab")}"
if (having.exists(_.nonEmpty))
sql += s"${delim}HAVING " +
- s"${fixQualifier(having.get).map(exprToString(_, useQualifier = true)).mkString(s" AND$delim$tab")}"
+ s"${fixQualifier(having.get)
+ .map(exprToString(_, useQualifier = true, caseSensitive = caseSensitiveEnabled)).mkString(s" AND$delim$tab")}"
if (orderBy.exists(_.nonEmpty))
sql += s"${delim}ORDER BY " +
- s"${fixQualifier(orderBy.get).map(exprToString(_, useQualifier = true)).mkString(s",$delim$tab")}"
+ s"${fixQualifier(orderBy.get)
+ .map(exprToString(_, useQualifier = true, caseSensitive = caseSensitiveEnabled)).mkString(s",$delim$tab")}"
if (limit.isDefined) {
- sql += s" LIMIT ${exprToString(fixQualifier0(limit.get), useQualifier = true)}"
+ sql += s" LIMIT ${exprToString(fixQualifier0(limit.get), useQualifier = true, caseSensitive = caseSensitiveEnabled)}"
if (nestedQuery)
sql = s"SELECT * FROM ($sql)"
@@ -79,8 +86,8 @@ private[apache] case class JoinSQLAccumulator(
}
/**
- * @return Filters for this query.
- */
+ * @return Filters for this query.
+ */
private def allFilters: Seq[Expression] = {
val leftFilters =
if (isSimpleTableAcc(left))
@@ -97,24 +104,27 @@ private[apache] case class JoinSQLAccumulator(
}
/**
- * @return `table1 LEFT JOIN (SELECT....FROM...) table2` part of join query.
- */
+ * @return `table1 LEFT JOIN (SELECT....FROM...) table2` part of join query.
+ */
private def compileJoinExpr: String = {
+ val caseSensitiveEnabled = isCaseSensitiveEnabled(igniteQueryContext)
+
val leftJoinSql =
if (isSimpleTableAcc(left))
- left.asInstanceOf[SingleTableSQLAccumulator].table.get
+ quoteStringIfNeeded(left.asInstanceOf[SingleTableSQLAccumulator].table.get, caseSensitiveEnabled)
else
s"(${left.compileQuery()}) ${leftAlias.get}"
val rightJoinSql = {
val leftTableName =
if (isSimpleTableAcc(left))
- left.qualifier
+ quoteStringIfNeeded(left.qualifier, caseSensitiveEnabled)
else
leftAlias.get
if (isSimpleTableAcc(right)) {
- val rightTableName = right.asInstanceOf[SingleTableSQLAccumulator].table.get
+ val rightTableName =
+ quoteStringIfNeeded(right.asInstanceOf[SingleTableSQLAccumulator].table.get, caseSensitiveEnabled)
if (leftTableName == rightTableName)
s"$rightTableName as ${rightAlias.get}"
@@ -125,7 +135,8 @@ private[apache] case class JoinSQLAccumulator(
}
s"$leftJoinSql $joinTypeSQL $rightJoinSql" +
- s"${condition.map(expr ⇒ s" ON ${exprToString(fixQualifier0(expr), useQualifier = true)}").getOrElse("")}"
+ s"${condition.map(expr ⇒ s" ON ${exprToString(fixQualifier0(expr),
+ useQualifier = true, caseSensitive = caseSensitiveEnabled)}").getOrElse("")}"
}
/**
@@ -170,11 +181,11 @@ private[apache] case class JoinSQLAccumulator(
}
/**
- * Find right qualifier for a `attr`.
- *
- * @param attr Attribute to fix qualifier in
- * @return Right qualifier for a `attr`
- */
+ * Find right qualifier for a `attr`.
+ *
+ * @param attr Attribute to fix qualifier in
+ * @return Right qualifier for a `attr`
+ */
private def findQualifier(attr: AttributeReference): String = {
val leftTableName =
if (isSimpleTableAcc(left))
diff --git a/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/SingleTableSQLAccumulator.scala b/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/SingleTableSQLAccumulator.scala
index 2f56d9e..129e08b 100644
--- a/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/SingleTableSQLAccumulator.scala
+++ b/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/SingleTableSQLAccumulator.scala
@@ -23,10 +23,10 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, NamedEx
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
/**
- * Class for accumulating parts of SQL query to a single Ignite table.
- *
- * See <a href="http://www.h2database.com/html/grammar.html#select">select syntax of H2</a>.
- */
+ * Class for accumulating parts of SQL query to a single Ignite table.
+ *
+ * See <a href="http://www.h2database.com/html/grammar.html#select">select syntax of H2</a>.
+ */
private[apache] case class SingleTableSQLAccumulator(
igniteQueryContext: IgniteQueryContext,
table: Option[String],
@@ -45,24 +45,30 @@ private[apache] case class SingleTableSQLAccumulator(
override def compileQuery(prettyPrint: Boolean = false, nestedQuery: Boolean = false): String = {
val delim = if (prettyPrint) "\n" else " "
val tab = if (prettyPrint) " " else ""
+ val caseSensitiveEnabled = isCaseSensitiveEnabled(igniteQueryContext)
- var sql = s"SELECT$delim$tab${outputExpressions.map(exprToString(_)).mkString(", ")}${delim}" +
+ var sql = s"SELECT$delim$tab${outputExpressions
+ .map(exprToString(_, caseSensitive = caseSensitiveEnabled)).mkString(", ")}${delim}" +
s"FROM$delim$tab$compiledTableExpression"
if (where.exists(_.nonEmpty))
- sql += s"${delim}WHERE$delim$tab${where.get.map(exprToString(_)).mkString(s" AND$delim$tab")}"
+ sql += s"${delim}WHERE$delim$tab${where.get
+ .map(exprToString(_, caseSensitive = caseSensitiveEnabled)).mkString(s" AND$delim$tab")}"
if (groupBy.exists(_.nonEmpty))
- sql += s"${delim}GROUP BY ${groupBy.get.map(exprToString(_)).mkString(s",$delim$tab")}"
+ sql += s"${delim}GROUP BY ${groupBy.get
+ .map(exprToString(_, caseSensitive = caseSensitiveEnabled)).mkString(s",$delim$tab")}"
if (having.exists(_.nonEmpty))
- sql += s"${delim}HAVING ${having.get.map(exprToString(_)).mkString(s" AND$delim$tab")}"
+ sql += s"${delim}HAVING ${having.get
+ .map(exprToString(_, caseSensitive = caseSensitiveEnabled)).mkString(s" AND$delim$tab")}"
if (orderBy.exists(_.nonEmpty))
- sql += s"${delim}ORDER BY ${orderBy.get.map(exprToString(_)).mkString(s",$delim$tab")}"
+ sql += s"${delim}ORDER BY ${orderBy.get
+ .map(exprToString(_,caseSensitive = caseSensitiveEnabled)).mkString(s",$delim$tab")}"
if (limit.isDefined) {
- sql += s" LIMIT ${limit.map(exprToString(_)).get}"
+ sql += s" LIMIT ${limit.map(exprToString(_, caseSensitive = caseSensitiveEnabled)).get}"
if (nestedQuery)
sql = s"SELECT * FROM ($sql)"
@@ -72,11 +78,16 @@ private[apache] case class SingleTableSQLAccumulator(
}
/**
- * @return From table SQL query part.
- */
+ * @return From table SQL query part.
+ */
private def compiledTableExpression: String = table match {
case Some(tableName) ⇒
- tableName
+ val caseSens = igniteQueryContext.sqlContext
+ .getConf("spark.sql.caseSensitive", "false").toBoolean
+ if (caseSens)
+ "\"" + tableName + "\""
+ else
+ tableName
case None ⇒ tableExpression match {
case Some((acc, alias)) ⇒
diff --git a/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/UnionSQLAccumulator.scala b/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/UnionSQLAccumulator.scala
index 29bfcda..d49828f 100644
--- a/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/UnionSQLAccumulator.scala
+++ b/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/UnionSQLAccumulator.scala
@@ -36,18 +36,20 @@ private[apache] case class UnionSQLAccumulator(
override def compileQuery(prettyPrint: Boolean = false, nestedQuery: Boolean = false): String = {
val delim = if (prettyPrint) "\n" else " "
val tab = if (prettyPrint) " " else ""
+ val caseSensitiveEnabled = isCaseSensitiveEnabled(igniteQueryContext)
var query = children.map(_.compileQuery(prettyPrint, nestedQuery = true)).mkString(s"${delim}UNION$delim")
query = orderBy match {
case Some(sortOrders) ⇒
- query + s"${delim}ORDER BY ${sortOrders.map(exprToString(_)).mkString(s",$delim$tab")}"
+ query + s"${delim}ORDER BY ${sortOrders
+ .map(exprToString(_, caseSensitive = caseSensitiveEnabled)).mkString(s",$delim$tab")}"
case None ⇒ query
}
if (limit.isDefined) {
- query += s" LIMIT ${exprToString(limit.get)}"
+ query += s" LIMIT ${exprToString(limit.get, caseSensitive = caseSensitiveEnabled)}"
if (nestedQuery)
query = s"SELECT * FROM ($query)"
@@ -57,8 +59,12 @@ private[apache] case class UnionSQLAccumulator(
}
/** @inheritdoc */
- override def simpleString(maxFields: Int): String =
- s"UnionSQLAccumulator(orderBy: ${orderBy.map(_.map(exprToString(_)).mkString(", ")).getOrElse("[]")})"
+ override def simpleString(maxFields: Int): String = {
+ val caseSensitiveEnabled = isCaseSensitiveEnabled(igniteQueryContext)
+
+ s"UnionSQLAccumulator(orderBy: ${orderBy.map(_.map(exprToString(_, caseSensitive = caseSensitiveEnabled))
+ .mkString(", ")).getOrElse("[]")})"
+ }
/** @inheritdoc */
override def withOutputExpressions(outputExpressions: Seq[NamedExpression]): QueryAccumulator =
diff --git a/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/package.scala b/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/package.scala
new file mode 100644
index 0000000..5416b80
--- /dev/null
+++ b/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/package.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.impl.optimization
+
+package object accumulator {
+
+ /**
+ * Read spark context and return value of "spark.sql.caseSensitive" property
+ * @param igniteQueryContext: IgniteQueryContext
+ * @return value of "spark.sql.caseSensitive" config property
+ */
+ def isCaseSensitiveEnabled(igniteQueryContext: IgniteQueryContext): Boolean = {
+ igniteQueryContext.sqlContext
+ .getConf("spark.sql.caseSensitive", "false").toBoolean
+ }
+}
diff --git a/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/package.scala b/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/package.scala
index 5526cad..3c221a8 100644
--- a/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/package.scala
+++ b/modules/spark-3.2-ext/spark-3.2/src/main/scala/org/apache/ignite/spark/impl/optimization/package.scala
@@ -53,15 +53,19 @@ package object optimization {
* @param useAlias If true outputs `expr` with alias.
* @return String representation of expression.
*/
- def exprToString(expr: Expression, useQualifier: Boolean = false, useAlias: Boolean = true): String = {
+ def exprToString(expr: Expression,
+ useQualifier: Boolean = false,
+ useAlias: Boolean = true,
+ caseSensitive: Boolean = false): String = {
@tailrec
def exprToString0(expr: Expression, supportedExpressions: List[SupportedExpressions]): Option[String] =
if (supportedExpressions.nonEmpty) {
val exprStr = supportedExpressions.head.toString(
expr,
- exprToString(_, useQualifier, useAlias = false),
+ exprToString(_, useQualifier, useAlias = false, caseSensitive),
useQualifier,
- useAlias)
+ useAlias,
+ caseSensitive)
exprStr match {
case res: Some[String] ⇒
diff --git a/modules/spark-3.2-ext/spark-3.2/src/test/scala/org/apache/ignite/spark/IgniteQueryCompilatorSpec.scala b/modules/spark-3.2-ext/spark-3.2/src/test/scala/org/apache/ignite/spark/IgniteQueryCompilatorSpec.scala
new file mode 100644
index 0000000..c0054c6
--- /dev/null
+++ b/modules/spark-3.2-ext/spark-3.2/src/test/scala/org/apache/ignite/spark/IgniteQueryCompilatorSpec.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark
+
+import org.apache.ignite.Ignite
+import org.apache.ignite.cache.query.SqlFieldsQuery
+import org.apache.ignite.internal.IgnitionEx
+import org.apache.ignite.spark.AbstractDataFrameSpec.{DEFAULT_CACHE, TEST_CONFIG_FILE, checkOptimizationResult, enclose}
+import org.apache.ignite.spark.IgniteDataFrameSettings.{FORMAT_IGNITE, OPTION_CONFIG_FILE, OPTION_TABLE}
+import org.apache.spark.sql.ignite.IgniteSparkSession
+import org.junit.runner.RunWith
+import org.scalatestplus.junit.JUnitRunner
+
+import java.lang.{Long => JLong}
+
+@RunWith(classOf[JUnitRunner])
+class IgniteQueryCompilatorSpec extends AbstractDataFrameSpec {
+ var igniteSession: IgniteSparkSession = _
+
+ describe("Supported column and table names in lower case") {
+
+ it("should successfully read table data via DataFrameReader") {
+ val igniteDF = igniteSession.read
+ .format(FORMAT_IGNITE)
+ .option(OPTION_TABLE, "strings1")
+ .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
+ .load()
+
+ assertResult(9)(igniteDF.count())
+ }
+
+ it("should successfully read table data from a single table via sql()") {
+ val df = igniteSession.sql("SELECT UPPER(str) FROM strings1 WHERE id = 1")
+
+ checkOptimizationResult(df, "SELECT UPPER(\"str\") AS \"upper(str)\" FROM \"strings1\" WHERE \"id\" = 1")
+
+ val data = Tuple1("AAA")
+
+ checkQueryData(df, data)
+ }
+
+ it("should successfully read table data from unioned tables via sql()") {
+ val df = igniteSession.sql(
+ "SELECT UPPER(str) FROM strings1 WHERE id = 1 " +
+ "UNION " +
+ "SELECT UPPER(str) FROM strings2 WHERE id = 7"
+ )
+
+ checkOptimizationResult(df, "SELECT \"upper(str)\" FROM (" +
+ "SELECT UPPER(\"str\") AS \"upper(str)\" FROM \"strings1\" WHERE \"id\" = 1 " +
+ "UNION " +
+ "SELECT UPPER(\"str\") AS \"upper(str)\" FROM \"strings2\" WHERE \"id\" = 7" +
+ ") table1")
+
+ val data = (
+ ("222"),
+ ("AAA")
+ )
+
+ checkQueryData(df, data)
+ }
+
+ it("should successfully read table data from joined tables via sql()") {
+ val df = igniteSession.sql("SELECT UPPER(s1.str) FROM strings1 s1 JOIN strings2 s2 ON s1.id = s2.id " +
+ "WHERE s1.id = 1")
+
+ checkOptimizationResult(df, "SELECT UPPER(\"strings1\".\"str\") AS \"upper(str)\" " +
+ "FROM \"strings1\" JOIN \"strings2\" ON \"strings1\".\"id\" = \"strings2\".\"id\" " +
+ "WHERE \"strings1\".\"id\" = 1 AND \"strings2\".\"id\" = 1")
+
+ val data = Tuple1("AAA")
+
+ checkQueryData(df, data)
+ }
+
+ }
+
+ override protected def beforeAll(): Unit = {
+ super.beforeAll()
+
+ createStringTable(client, DEFAULT_CACHE, "strings1")
+ createStringTable(client, DEFAULT_CACHE, "strings2")
+
+ val configProvider = enclose(null)(x ⇒ () ⇒ {
+ val cfg = IgnitionEx.loadConfiguration(TEST_CONFIG_FILE).get1()
+
+ cfg.setClientMode(true)
+
+ cfg.setIgniteInstanceName("client-2")
+
+ cfg
+ })
+
+ igniteSession = IgniteSparkSession.builder()
+ .config(spark.sparkContext.getConf)
+ .config("spark.sql.caseSensitive", "true")
+ .igniteConfigProvider(configProvider)
+ .getOrCreate()
+ }
+
+ def createStringTable(client: Ignite, cacheName: String, tableName: String): Unit = {
+ val cache = client.cache(cacheName)
+
+ cache.query(new SqlFieldsQuery(
+ s"""
+ | CREATE TABLE "$tableName" (
+ | "id" LONG,
+ | "str" VARCHAR,
+ | PRIMARY KEY ("id")) WITH "backups=1"
+ """.stripMargin)).getAll
+
+ val qry = new SqlFieldsQuery(s"""INSERT INTO \"$tableName\" (\"id\", \"str\") values (?, ?)""")
+
+ cache.query(qry.setArgs(1L.asInstanceOf[JLong], "aaa")).getAll
+ cache.query(qry.setArgs(2L.asInstanceOf[JLong], "AAA")).getAll
+ cache.query(qry.setArgs(3L.asInstanceOf[JLong], "AAA ")).getAll
+ cache.query(qry.setArgs(4L.asInstanceOf[JLong], " AAA")).getAll
+ cache.query(qry.setArgs(5L.asInstanceOf[JLong], " AAA ")).getAll
+ cache.query(qry.setArgs(6L.asInstanceOf[JLong], "ABCDEF")).getAll
+ cache.query(qry.setArgs(7L.asInstanceOf[JLong], "222")).getAll
+ cache.query(qry.setArgs(8L.asInstanceOf[JLong], null)).getAll
+ cache.query(qry.setArgs(9L.asInstanceOf[JLong], "BAAAB")).getAll
+ }
+}