You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2018/04/23 07:12:35 UTC
[39/50] [abbrv] ignite git commit: IGNITE-7077: Implementation of
Spark query optimization. - Fixes #3397.
http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/UnionSQLAccumulator.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/UnionSQLAccumulator.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/UnionSQLAccumulator.scala
new file mode 100644
index 0000000..723e17a
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/UnionSQLAccumulator.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.impl.optimization.accumulator
+
+import org.apache.ignite.spark.impl.optimization.{IgniteQueryContext, exprToString, toAttributeReference}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, NamedExpression, SortOrder}
+
+/**
+ * Accumulator to store info about UNION query.
+ */
+private[apache] case class UnionSQLAccumulator(
+ igniteQueryContext: IgniteQueryContext,
+ children: Seq[QueryAccumulator],
+ outputExpressions: Seq[NamedExpression],
+ orderBy: Option[Seq[SortOrder]] = None
+) extends QueryAccumulator {
+ /** @inheritdoc */
+ override def compileQuery(prettyPrint: Boolean = false): String = {
+ val delim = if (prettyPrint) "\n" else " "
+ val tab = if (prettyPrint) " " else ""
+
+ val query = children.map(_.compileQuery(prettyPrint)).mkString(s"${delim}UNION$delim")
+
+ orderBy match {
+ case Some(sortOrders) ⇒
+ query + s"${delim}ORDER BY ${sortOrders.map(exprToString(_)).mkString(s",$delim$tab")}"
+
+ case None ⇒ query
+ }
+ }
+
+ /** @inheritdoc */
+ override def simpleString: String =
+ s"UnionSQLAccumulator(orderBy: ${orderBy.map(_.map(exprToString(_)).mkString(", ")).getOrElse("[]")})"
+
+ /** @inheritdoc */
+ override def withOutputExpressions(outputExpressions: Seq[NamedExpression]): QueryAccumulator =
+ copy(outputExpressions= outputExpressions)
+
+ /** @inheritdoc */
+ override def withOrderBy(orderBy: Seq[SortOrder]): QueryAccumulator = copy(orderBy = Some(orderBy))
+
+ /** @inheritdoc */
+ override def output: Seq[Attribute] = outputExpressions.map(toAttributeReference(_, Seq.empty))
+
+ /** @inheritdoc */
+ override lazy val qualifier: String = igniteQueryContext.uniqueTableAlias
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/package.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/package.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/package.scala
new file mode 100644
index 0000000..4e168f4
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/package.scala
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.impl
+
+
+import org.apache.ignite.IgniteException
+import org.apache.ignite.spark.impl.optimization.accumulator.{QueryAccumulator, SingleTableSQLAccumulator}
+import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Count}
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, ExprId, Expression, NamedExpression}
+import org.apache.spark.sql.types._
+
+import scala.annotation.tailrec
+
+/**
+ */
+package object optimization {
+ /**
+ * Constant to store alias in column metadata.
+ */
+ private[optimization] val ALIAS: String = "alias"
+
+ /**
+ * All `SupportedExpression` implementations.
+ */
+ private val SUPPORTED_EXPRESSIONS: List[SupportedExpressions] = List (
+ SimpleExpressions,
+ SystemExpressions,
+ AggregateExpressions,
+ ConditionExpressions,
+ DateExpressions,
+ MathExpressions,
+ StringExpressions
+ )
+
+ /**
+ * @param expr Expression.
+ * @param useQualifier If true outputs attributes of `expr` with qualifier.
+ * @param useAlias If true outputs `expr` with alias.
+ * @return String representation of expression.
+ */
+ def exprToString(expr: Expression, useQualifier: Boolean = false, useAlias: Boolean = true): String = {
+ @tailrec
+ def exprToString0(expr: Expression, supportedExpressions: List[SupportedExpressions]): Option[String] =
+ if (supportedExpressions.nonEmpty) {
+ val exprStr = supportedExpressions.head.toString(
+ expr,
+ exprToString(_, useQualifier, useAlias = false),
+ useQualifier,
+ useAlias)
+
+ exprStr match {
+ case res: Some[String] ⇒
+ res
+ case None ⇒
+ exprToString0(expr, supportedExpressions.tail)
+ }
+ }
+ else
+ None
+
+ exprToString0(expr, SUPPORTED_EXPRESSIONS) match {
+ case Some(str) ⇒ str
+
+ case None ⇒
+ throw new IgniteException("Unsupporte expression " + expr)
+ }
+ }
+
+ /**
+ * @param exprs Expressions to check.
+ * @return True if `exprs` contains only allowed(i.e. can be pushed down to Ignite) expressions false otherwise.
+ */
+ def exprsAllowed(exprs: Seq[Expression]): Boolean =
+ exprs.forall(exprsAllowed)
+
+ /**
+ * @param expr Expression to check.
+ * @return True if `expr` allowed(i.e. can be pushed down to Ignite) false otherwise.
+ *
+ */
+ def exprsAllowed(expr: Expression): Boolean =
+ SUPPORTED_EXPRESSIONS.exists(_(expr, exprsAllowed))
+
+ /**
+ * Converts `input` into `AttributeReference`.
+ *
+ * @param input Expression to convert.
+ * @param existingOutput Existing output.
+ * @param exprId Optional expression ID to use.
+ * @param alias Optional alias for a result.
+ * @return Converted expression.
+ */
+ def toAttributeReference(input: Expression, existingOutput: Seq[NamedExpression], exprId: Option[ExprId] = None,
+ alias: Option[String] = None): AttributeReference = {
+
+ input match {
+ case attr: AttributeReference ⇒
+ val toCopy = existingOutput.find(_.exprId == attr.exprId).getOrElse(attr)
+
+ AttributeReference(
+ name = toCopy.name,
+ dataType = toCopy.dataType,
+ metadata = alias
+ .map(new MetadataBuilder().withMetadata(toCopy.metadata).putString(ALIAS, _).build())
+ .getOrElse(toCopy.metadata)
+ )(exprId = exprId.getOrElse(toCopy.exprId), qualifier = toCopy.qualifier, isGenerated = toCopy.isGenerated)
+
+ case a: Alias ⇒
+ toAttributeReference(a.child, existingOutput, Some(a.exprId), Some(alias.getOrElse(a.name)))
+
+ case agg: AggregateExpression ⇒
+ agg.aggregateFunction match {
+ case c: Count ⇒
+ if (agg.isDistinct)
+ AttributeReference(
+ name = s"COUNT(DISTINCT ${c.children.map(exprToString(_)).mkString(" ")})",
+ dataType = LongType,
+ metadata = alias
+ .map(new MetadataBuilder().putString(ALIAS, _).build())
+ .getOrElse(Metadata.empty)
+ )(exprId = exprId.getOrElse(agg.resultId))
+ else
+ AttributeReference(
+ name = s"COUNT(${c.children.map(exprToString(_)).mkString(" ")})",
+ dataType = LongType,
+ metadata = alias
+ .map(new MetadataBuilder().putString(ALIAS, _).build())
+ .getOrElse(Metadata.empty)
+ )(exprId = exprId.getOrElse(agg.resultId))
+
+ case _ ⇒
+ toAttributeReference(agg.aggregateFunction, existingOutput, Some(exprId.getOrElse(agg.resultId)), alias)
+ }
+
+ case ne: NamedExpression ⇒
+ AttributeReference(
+ name = exprToString(input),
+ dataType = input.dataType,
+ metadata = alias
+ .map(new MetadataBuilder().withMetadata(ne.metadata).putString(ALIAS, _).build())
+ .getOrElse(Metadata.empty)
+ )(exprId = exprId.getOrElse(ne.exprId))
+
+ case _ if exprsAllowed(input) ⇒
+ AttributeReference(
+ name = exprToString(input),
+ dataType = input.dataType,
+ metadata = alias
+ .map(new MetadataBuilder().putString(ALIAS, _).build())
+ .getOrElse(Metadata.empty)
+ )(exprId = exprId.getOrElse(NamedExpression.newExprId))
+
+ case _ ⇒
+ throw new IgniteException(s"Unsupported column expression $input")
+ }
+ }
+
+ /**
+ * @param dataType Spark data type.
+ * @return SQL data type.
+ */
+ def toSqlType(dataType: DataType): String = dataType match {
+ case BooleanType ⇒ "BOOLEAN"
+ case IntegerType ⇒ "INT"
+ case ByteType ⇒ "TINYINT"
+ case ShortType ⇒ "SMALLINT"
+ case LongType ⇒ "BIGINT"
+ case DecimalType() ⇒ "DECIMAL"
+ case DoubleType ⇒ "DOUBLE"
+ case FloatType ⇒ "REAL"
+ case DateType ⇒ "DATE"
+ case TimestampType ⇒ "TIMESTAMP"
+ case StringType ⇒ "VARCHAR"
+ case BinaryType ⇒ "BINARY"
+ case ArrayType(_, _) ⇒ "ARRAY"
+ case _ ⇒
+ throw new IgniteException(s"$dataType not supported!")
+ }
+
+ /**
+ * @param expr Expression
+ * @return True if expression or some of it children is AggregateExpression, false otherwise.
+ */
+ def hasAggregateInside(expr: Expression): Boolean = {
+ def hasAggregateInside0(expr: Expression): Boolean = expr match {
+ case AggregateExpression(_, _, _, _) ⇒
+ true
+
+ case e: Expression ⇒
+ e.children.exists(hasAggregateInside0)
+ }
+
+ hasAggregateInside0(expr)
+ }
+
+ /**
+ * Check if `acc` representing simple query.
+ * Simple is `SELECT ... FROM table WHERE ... ` like query.
+ * Without aggregation, limits, order, embedded select expressions.
+ *
+ * @param acc Accumulator to check.
+ * @return True if accumulator stores simple query info, false otherwise.
+ */
+ def isSimpleTableAcc(acc: QueryAccumulator): Boolean = acc match {
+ case acc: SingleTableSQLAccumulator if acc.table.isDefined ⇒
+ acc.groupBy.isEmpty &&
+ acc.localLimit.isEmpty &&
+ acc.orderBy.isEmpty &&
+ !acc.distinct &&
+ !acc.outputExpressions.exists(hasAggregateInside)
+
+ case _ ⇒
+ false
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/package.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/package.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/package.scala
index 4634a97..6502c0f 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/package.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/package.scala
@@ -19,12 +19,15 @@ package org.apache.ignite.spark
import org.apache.commons.lang.StringUtils.equalsIgnoreCase
import org.apache.ignite.{Ignite, IgniteException, IgniteState, Ignition}
-import org.apache.ignite.cache.QueryEntity
+import org.apache.ignite.cache.{CacheMode, QueryEntity}
+import org.apache.ignite.cluster.ClusterNode
import org.apache.ignite.configuration.CacheConfiguration
import org.apache.ignite.internal.util.lang.GridFunc.contains
+import org.apache.spark.Partition
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
package object impl {
/**
@@ -129,4 +132,47 @@ package object impl {
*/
def isKeyColumn(table: QueryEntity, column: String): Boolean =
contains(table.getKeyFields, column) || equalsIgnoreCase(table.getKeyFieldName, column)
+
+ /**
+ * Computes spark partitions for a given cache.
+ *
+ * @param ic Ignite context.
+ * @param cacheName Cache name
+ * @return Array of IgniteDataFramPartition
+ */
+ def calcPartitions(ic: IgniteContext, cacheName: String): Array[Partition] = {
+ val cache = ic.ignite().cache[Any, Any](cacheName)
+
+ val ccfg = cache.getConfiguration(classOf[CacheConfiguration[Any, Any]])
+
+ if (ccfg.getCacheMode == CacheMode.REPLICATED) {
+ val serverNodes = ic.ignite().cluster().forCacheNodes(cacheName).forServers().nodes()
+
+ Array(IgniteDataFramePartition(0, serverNodes.head, Stream.from(0).take(1024).toList))
+ }
+ else {
+ val aff = ic.ignite().affinity(cacheName)
+
+ val parts = aff.partitions()
+
+ val nodesToParts = (0 until parts).foldLeft(Map[ClusterNode, ArrayBuffer[Int]]()) {
+ case (nodeToParts, ignitePartIdx) ⇒
+ val primary = aff.mapPartitionToPrimaryAndBackups(ignitePartIdx).head
+
+ if (nodeToParts.contains(primary)) {
+ nodeToParts(primary) += ignitePartIdx
+
+ nodeToParts
+ }
+ else
+ nodeToParts + (primary → ArrayBuffer[Int](ignitePartIdx))
+ }
+
+ val partitions = nodesToParts.zipWithIndex.map { case ((node, nodesParts), i) ⇒
+ IgniteDataFramePartition(i, node, nodesParts.toList)
+ }
+
+ partitions.toArray
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteOptimization.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteOptimization.scala b/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteOptimization.scala
new file mode 100644
index 0000000..b23cd6f
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteOptimization.scala
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.ignite
+
+import org.apache.ignite.IgniteException
+import org.apache.ignite.spark.impl.{IgniteSQLAccumulatorRelation, IgniteSQLRelation, sqlCacheName}
+import org.apache.ignite.spark.impl.optimization.{accumulator, _}
+import org.apache.ignite.spark.impl.optimization.accumulator._
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+
+/**
+ * Query plan optimization for a Ignite based queries.
+ */
+object IgniteOptimization extends Rule[LogicalPlan] with Logging {
+ /** @inheritdoc */
+ override def apply(plan: LogicalPlan): LogicalPlan = {
+ logDebug("")
+ logDebug("== Plan Before Ignite Operator Push Down ==")
+ logDebug(plan.toString())
+
+ val transformed = fixAmbiguousOutput(pushDownOperators(plan))
+
+ logDebug("")
+ logDebug("== Plan After Ignite Operator Push Down ==")
+ logDebug(transformed.toString())
+
+ makeIgniteAccRelation(transformed)
+ }
+
+ /**
+ * Change query plan by accumulating query parts supported by Ignite into `QueryAccumulator`.
+ *
+ * @param plan Query plan.
+ * @return Transformed plan.
+ */
+ private def pushDownOperators(plan: LogicalPlan): LogicalPlan = {
+ val aliasIndexIterator = Stream.from(1).iterator
+
+ //Flag to indicate that some step was skipped due to unsupported expression.
+ //When it true we has to skip entire transformation of higher level Nodes.
+ var stepSkipped = true
+
+ //Applying optimization rules from bottom to up tree nodes.
+ plan.transformUp {
+ //We found basic node to transform.
+ //We create new accumulator and going to the upper layers.
+ case LogicalRelation(igniteSqlRelation: IgniteSQLRelation[_, _], output, _catalogTable) ⇒
+ //Clear flag to optimize each statement separately
+ stepSkipped = false
+
+ val igniteQueryContext = IgniteQueryContext(
+ igniteContext = igniteSqlRelation.ic,
+ sqlContext = igniteSqlRelation.sqlContext,
+ catalogTable = _catalogTable,
+ aliasIndex = aliasIndexIterator,
+ cacheName =
+ sqlCacheName(igniteSqlRelation.ic.ignite(), igniteSqlRelation.tableName)
+ .getOrElse(throw new IgniteException("Unknown table")))
+
+ //Logical Relation is bottomest TreeNode in LogicalPlan.
+ //We replace it with accumulator.
+ //We push all supported SQL operator into it on the higher tree levels.
+ SingleTableSQLAccumulator(
+ igniteQueryContext = igniteQueryContext,
+ table = Some(igniteSqlRelation.tableName),
+ tableExpression = None,
+ outputExpressions = output.map(attr ⇒ attr.withQualifier(Some(igniteSqlRelation.tableName))))
+
+ case project: Project if !stepSkipped && exprsAllowed(project.projectList) ⇒
+ //Project layer just changes output of current query.
+ project.child match {
+ case acc: SelectAccumulator ⇒
+ acc.withOutputExpressions(
+ substituteExpressions(project.projectList, acc.outputExpressions))
+
+ case _ ⇒
+ throw new IgniteException("stepSkipped == true but child is not SelectAccumulator")
+ }
+
+ case sort: Sort if !stepSkipped && isSortPushDownAllowed(sort.order, sort.global) ⇒
+ sort.child match {
+ case acc: QueryAccumulator ⇒
+ acc.withOrderBy(sort.order)
+
+ case _ ⇒
+ throw new IgniteException("stepSkipped == true but child is not SelectAccumulator")
+ }
+
+ case filter: Filter if !stepSkipped && exprsAllowed(filter.condition) ⇒
+
+ filter.child match {
+ case acc: SelectAccumulator ⇒
+ if (hasAggregateInside(filter.condition) || acc.groupBy.isDefined)
+ acc.withHaving(acc.having.getOrElse(Nil) :+ filter.condition)
+ else
+ acc.withWhere(acc.where.getOrElse(Nil) :+ filter.condition)
+
+ case _ ⇒
+ throw new IgniteException("stepSkipped == true but child is not SelectAccumulator")
+ }
+
+ case agg: Aggregate
+ if !stepSkipped && exprsAllowed(agg.groupingExpressions) && exprsAllowed(agg.aggregateExpressions) ⇒
+
+ agg.child match {
+ case acc: SelectAccumulator ⇒
+ if (acc.groupBy.isDefined) {
+ val tableAlias = acc.igniteQueryContext.uniqueTableAlias
+
+ accumulator.SingleTableSQLAccumulator(
+ igniteQueryContext = acc.igniteQueryContext,
+ table = None,
+ tableExpression = Some((acc, tableAlias)),
+ outputExpressions = agg.aggregateExpressions)
+ }
+ else
+ acc
+ .withGroupBy(agg.groupingExpressions)
+ .withOutputExpressions(
+ substituteExpressions(agg.aggregateExpressions, acc.outputExpressions))
+
+ case acc: QueryAccumulator ⇒
+ val tableAlias = acc.igniteQueryContext.uniqueTableAlias
+
+ accumulator.SingleTableSQLAccumulator(
+ igniteQueryContext = acc.igniteQueryContext,
+ table = None,
+ tableExpression = Some((acc, tableAlias)),
+ outputExpressions = agg.aggregateExpressions)
+
+ case _ ⇒
+ throw new IgniteException("stepSkipped == true but child is not SelectAccumulator")
+ }
+
+ case limit: LocalLimit if !stepSkipped && exprsAllowed(limit.limitExpr) ⇒
+ limit.child match {
+ case acc: SelectAccumulator ⇒
+ acc.withLocalLimit(limit.limitExpr)
+
+ case _ ⇒
+ throw new IgniteException("stepSkipped == true but child is not SelectAccumulator")
+ }
+
+ case limit: GlobalLimit if !stepSkipped && exprsAllowed(limit.limitExpr) ⇒
+ limit.child.transformUp {
+ case acc: SelectAccumulator ⇒
+ acc.withLimit(limit.limitExpr)
+
+ case _ ⇒
+ throw new IgniteException("stepSkipped == true but child is not SelectAccumulator")
+ }
+
+ case union: Union if !stepSkipped && isAllChildrenOptimized(union.children) ⇒
+ val first = union.children.head.asInstanceOf[QueryAccumulator]
+
+ val subQueries = union.children.map(_.asInstanceOf[QueryAccumulator])
+
+ UnionSQLAccumulator(
+ first.igniteQueryContext,
+ subQueries,
+ subQueries.head.output)
+
+ case join: Join
+ if !stepSkipped && isAllChildrenOptimized(Seq(join.left, join.right)) &&
+ join.condition.forall(exprsAllowed) ⇒
+
+ val left = join.left.asInstanceOf[QueryAccumulator]
+
+ val (leftOutput, leftAlias) =
+ if (!isSimpleTableAcc(left)) {
+ val tableAlias = left.igniteQueryContext.uniqueTableAlias
+
+ (left.output, Some(tableAlias))
+ }
+ else
+ (left.output, None)
+
+ val right = join.right.asInstanceOf[QueryAccumulator]
+
+ val (rightOutput, rightAlias) =
+ if (!isSimpleTableAcc(right) ||
+ leftAlias.getOrElse(left.qualifier) == right.qualifier) {
+ val tableAlias = right.igniteQueryContext.uniqueTableAlias
+
+ (right.output, Some(tableAlias))
+ }
+ else
+ (right.output, None)
+
+ JoinSQLAccumulator(
+ left.igniteQueryContext,
+ left,
+ right,
+ join.joinType,
+ leftOutput ++ rightOutput,
+ join.condition,
+ leftAlias,
+ rightAlias)
+
+ case unknown ⇒
+ stepSkipped = true
+
+ unknown
+ }
+ }
+
+ /**
+ * Changes qualifiers for an ambiguous columns names.
+ *
+ * @param plan Query plan.
+ * @return Transformed plan.
+ */
+ private def fixAmbiguousOutput(plan: LogicalPlan): LogicalPlan = plan.transformDown {
+ case acc: SingleTableSQLAccumulator if acc.children.exists(_.isInstanceOf[JoinSQLAccumulator]) ⇒
+ val fixedChildOutput =
+ fixAmbiguousOutput(acc.tableExpression.get._1.outputExpressions, acc.igniteQueryContext)
+
+ val newOutput = substituteExpressions(acc.outputExpressions, fixedChildOutput, changeOnlyName = true)
+
+ acc.copy(
+ outputExpressions = newOutput,
+ where = acc.where.map(
+ substituteExpressions(_, fixedChildOutput, changeOnlyName = true)),
+ groupBy = acc.groupBy.map(
+ substituteExpressions(_, fixedChildOutput, changeOnlyName = true)),
+ having = acc.having.map(
+ substituteExpressions(_, fixedChildOutput, changeOnlyName = true)),
+ limit = acc.limit.map(
+ substituteExpression(_, fixedChildOutput, changeOnlyName = true)),
+ localLimit = acc.localLimit.map(
+ substituteExpression(_, fixedChildOutput, changeOnlyName = true)),
+ orderBy = acc.orderBy.map(
+ substituteExpressions(_, fixedChildOutput, changeOnlyName = true)))
+
+ acc
+
+ case acc: JoinSQLAccumulator
+ if acc.left.isInstanceOf[JoinSQLAccumulator] || acc.right.isInstanceOf[JoinSQLAccumulator] ⇒
+ val leftFixed = acc.left match {
+ case leftJoin: JoinSQLAccumulator ⇒
+ val fixedChildOutput = fixAmbiguousOutput(acc.left.outputExpressions, acc.igniteQueryContext)
+
+ val newOutput =
+ substituteExpressions(acc.outputExpressions, fixedChildOutput, changeOnlyName = true)
+
+ acc.copy(
+ outputExpressions = newOutput,
+ left = leftJoin.copy(outputExpressions = fixedChildOutput),
+ condition = acc.condition.map(
+ substituteExpression(_, fixedChildOutput, changeOnlyName = true)),
+ where = acc.where.map(
+ substituteExpressions(_, fixedChildOutput, changeOnlyName = true)),
+ groupBy = acc.groupBy.map(
+ substituteExpressions(_, fixedChildOutput, changeOnlyName = true)),
+ having = acc.having.map(
+ substituteExpressions(_, fixedChildOutput, changeOnlyName = true)),
+ limit = acc.limit.map(
+ substituteExpression(_, fixedChildOutput, changeOnlyName = true)),
+ localLimit = acc.localLimit.map(
+ substituteExpression(_, fixedChildOutput, changeOnlyName = true)),
+ orderBy = acc.orderBy.map(
+ substituteExpressions(_, fixedChildOutput, changeOnlyName = true)))
+
+ case _ ⇒ acc
+ }
+
+ val fixed = leftFixed.right match {
+ case rightJoin: JoinSQLAccumulator ⇒
+ val fixedChildOutput =
+ fixAmbiguousOutput(leftFixed.outputExpressions, leftFixed.igniteQueryContext)
+
+ val newOutput = substituteExpressions(leftFixed.outputExpressions, fixedChildOutput)
+
+ leftFixed.copy(
+ outputExpressions = newOutput,
+ right = rightJoin.copy(outputExpressions = fixedChildOutput),
+ condition = acc.condition.map(
+ substituteExpression(_, fixedChildOutput, changeOnlyName = true)),
+ where = acc.where.map(
+ substituteExpressions(_, fixedChildOutput, changeOnlyName = true)),
+ groupBy = acc.groupBy.map(
+ substituteExpressions(_, fixedChildOutput, changeOnlyName = true)),
+ having = acc.having.map(
+ substituteExpressions(_, fixedChildOutput, changeOnlyName = true)),
+ limit = acc.limit.map(
+ substituteExpression(_, fixedChildOutput, changeOnlyName = true)),
+ localLimit = acc.localLimit.map(
+ substituteExpression(_, fixedChildOutput, changeOnlyName = true)),
+ orderBy = acc.orderBy.map(
+ substituteExpressions(_, fixedChildOutput, changeOnlyName = true)))
+
+ case _ ⇒ leftFixed
+ }
+
+ fixed.copy(
+ condition = acc.condition.map(
+ substituteExpression(_, acc.outputExpressions, changeOnlyName = true)),
+ where = acc.where.map(
+ substituteExpressions(_, acc.outputExpressions, changeOnlyName = true)),
+ groupBy = acc.groupBy.map(
+ substituteExpressions(_, acc.outputExpressions, changeOnlyName = true)),
+ having = acc.having.map(
+ substituteExpressions(_, acc.outputExpressions, changeOnlyName = true)),
+ limit = acc.limit.map(
+ substituteExpression(_, acc.outputExpressions, changeOnlyName = true)),
+ localLimit = acc.localLimit.map(
+ substituteExpression(_, acc.outputExpressions, changeOnlyName = true)),
+ orderBy = acc.orderBy.map(
+ substituteExpressions(_, acc.outputExpressions, changeOnlyName = true)))
+
+ case unknown ⇒
+ unknown
+ }
+
+ private def fixAmbiguousOutput(exprs: Seq[NamedExpression], ctx: IgniteQueryContext): Seq[NamedExpression] =
+ exprs.foldLeft((Set[String](), Set[NamedExpression]())) {
+ case ((uniqueNames, fixed), next) ⇒
+ if (uniqueNames(next.name))
+ (uniqueNames, fixed + Alias(next, ctx.uniqueColumnAlias(next))(exprId = next.exprId))
+ else
+ (uniqueNames + next.name, fixed + next)
+ }._2.toSeq
+
+ /**
+ * Substitutes each `QueryAccumulator` with a `LogicalRelation` contains `IgniteSQLAccumulatorRelation`.
+ *
+ * @param plan Query plan.
+ * @return Transformed plan.
+ */
+ private def makeIgniteAccRelation(plan: LogicalPlan): LogicalPlan =
+ plan.transformDown {
+ case acc: QueryAccumulator ⇒
+ new LogicalRelation (
+ relation = IgniteSQLAccumulatorRelation(acc),
+ output = acc.outputExpressions.map(toAttributeReference(_, Seq.empty)),
+ catalogTable = acc.igniteQueryContext.catalogTable)
+ }
+
+ /**
+ * @param order Order.
+ * @param global True if order applied to entire result set false if ordering per-partition.
+ * @return True if sort can be pushed down to Ignite, false otherwise.
+ */
+ private def isSortPushDownAllowed(order: Seq[SortOrder], global: Boolean): Boolean =
+ global && order.map(_.child).forall(exprsAllowed)
+
+ /**
+ * @param children Plans to check.
+ * @return True is all plan are `QueryAccumulator`, false otherwise.
+ */
+ private def isAllChildrenOptimized(children: Seq[LogicalPlan]): Boolean =
+ children.forall {
+ case _: QueryAccumulator ⇒
+ true
+
+ case _ ⇒
+ false
+ }
+
+ /**
+ * Changes expression from `exprs` collection to expression with same `exprId` from `substitution`.
+ *
+ * @param exprs Expressions to substitute.
+ * @param substitution Expressions for substitution
+ * @param changeOnlyName If true substitute only expression name.
+ * @tparam T Concrete expression type.
+ * @return Substituted expressions.
+ */
+ private def substituteExpressions[T <: Expression](exprs: Seq[T], substitution: Seq[NamedExpression],
+ changeOnlyName: Boolean = false): Seq[T] = {
+
+ exprs.map(substituteExpression(_, substitution, changeOnlyName))
+ }
+
+ private def substituteExpression[T <: Expression](expr: T, substitution: Seq[NamedExpression],
+ changeOnlyName: Boolean): T = expr match {
+ case ne: NamedExpression ⇒
+ substitution.find(_.exprId == ne.exprId) match {
+ case Some(found) ⇒
+ if (!changeOnlyName)
+ found.asInstanceOf[T]
+ else ne match {
+ case alias: Alias ⇒
+ Alias(
+ AttributeReference(
+ found.name,
+ found.dataType,
+ nullable = found.nullable,
+ metadata = found.metadata)(
+ exprId = found.exprId,
+ qualifier = found.qualifier,
+ isGenerated = found.isGenerated),
+ alias.name) (
+ exprId = alias.exprId,
+ qualifier = alias.qualifier,
+ explicitMetadata = alias.explicitMetadata,
+ isGenerated = alias.isGenerated).asInstanceOf[T]
+
+ case attr: AttributeReference ⇒
+ attr.copy(name = found.name)(
+ exprId = found.exprId,
+ qualifier = found.qualifier,
+ isGenerated = found.isGenerated).asInstanceOf[T]
+
+ case _ ⇒ ne.asInstanceOf[T]
+ }
+
+ case None ⇒
+ expr.withNewChildren(
+ substituteExpressions(expr.children, substitution, changeOnlyName)).asInstanceOf[T]
+ }
+
+ case _ ⇒
+ expr.withNewChildren(
+ substituteExpressions(expr.children, substitution, changeOnlyName)).asInstanceOf[T]
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteSparkSession.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteSparkSession.scala b/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteSparkSession.scala
index 8860590..1fccc3a 100644
--- a/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteSparkSession.scala
+++ b/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteSparkSession.scala
@@ -66,8 +66,14 @@ class IgniteSparkSession private(ic: IgniteContext, proxy: SparkSession) extends
new IgniteSharedState(ic, sparkContext)
/** @inheritdoc */
- @transient override lazy val sessionState: SessionState =
- new SessionStateBuilder(self, None).build()
+ @transient override lazy val sessionState: SessionState = {
+ val sessionState = new SessionStateBuilder(self, None).build()
+
+ sessionState.experimentalMethods.extraOptimizations =
+ sessionState.experimentalMethods.extraOptimizations :+ IgniteOptimization
+
+ sessionState
+ }
/** @inheritdoc */
@transient override lazy val conf: RuntimeConfig = proxy.conf
http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/test/scala/org/apache/ignite/spark/AbstractDataFrameSpec.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/AbstractDataFrameSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/AbstractDataFrameSpec.scala
index 8613592..29a4e6f 100644
--- a/modules/spark/src/test/scala/org/apache/ignite/spark/AbstractDataFrameSpec.scala
+++ b/modules/spark/src/test/scala/org/apache/ignite/spark/AbstractDataFrameSpec.scala
@@ -19,13 +19,16 @@ package org.apache.ignite.spark
import org.apache.ignite.{Ignite, Ignition}
import org.apache.ignite.configuration.{CacheConfiguration, IgniteConfiguration}
-import org.apache.spark.sql.SparkSession
-import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSpec, Matchers}
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.scalatest._
import java.lang.{Long ⇒ JLong}
import org.apache.ignite.cache.query.SqlFieldsQuery
import org.apache.ignite.cache.query.annotations.QuerySqlField
-import org.apache.ignite.internal.IgnitionEx
+import org.apache.ignite.internal.IgnitionEx.loadConfiguration
+import org.apache.ignite.spark.AbstractDataFrameSpec.configuration
+import org.apache.ignite.spark.impl.IgniteSQLAccumulatorRelation
+import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.ignite.spark.AbstractDataFrameSpec._
import scala.annotation.meta.field
@@ -33,12 +36,13 @@ import scala.reflect.ClassTag
/**
*/
-abstract class AbstractDataFrameSpec extends FunSpec with Matchers with BeforeAndAfterAll with BeforeAndAfter {
+abstract class AbstractDataFrameSpec extends FunSpec with Matchers with BeforeAndAfterAll with BeforeAndAfter
+ with Assertions {
var spark: SparkSession = _
var client: Ignite = _
- override protected def beforeAll() = {
+ override protected def beforeAll(): Unit = {
for (i ← 0 to 3)
Ignition.start(configuration("grid-" + i, client = false))
@@ -47,7 +51,7 @@ abstract class AbstractDataFrameSpec extends FunSpec with Matchers with BeforeAn
createSparkSession()
}
- override protected def afterAll() = {
+ override protected def afterAll(): Unit = {
Ignition.stop("client", false)
for (i ← 0 to 3)
@@ -108,6 +112,7 @@ abstract class AbstractDataFrameSpec extends FunSpec with Matchers with BeforeAn
cache.query(qry.setArgs(1L.asInstanceOf[JLong], "Forest Hill")).getAll
cache.query(qry.setArgs(2L.asInstanceOf[JLong], "Denver")).getAll
cache.query(qry.setArgs(3L.asInstanceOf[JLong], "St. Petersburg")).getAll
+ cache.query(qry.setArgs(4L.asInstanceOf[JLong], "St. Petersburg")).getAll
}
def createEmployeeCache(client: Ignite, cacheName: String): Unit = {
@@ -119,6 +124,31 @@ abstract class AbstractDataFrameSpec extends FunSpec with Matchers with BeforeAn
cache.put("key2", Employee(2, "Sarah Connor", 32, 10000))
cache.put("key3", Employee(3, "Arnold Schwarzenegger", 27, 1000))
}
+
+ def checkQueryData[T](res: DataFrame, expectedRes: Product)
+ (implicit ord: T ⇒ Ordered[T]): Unit =
+ checkQueryData(res, expectedRes, _.getAs[T](0))
+
+ def checkQueryData[Ordered](res: DataFrame, expectedRes: Product, sorter: Row => Ordered)
+ (implicit ord: Ordering[Ordered]): Unit = {
+ val data = res.rdd.collect.sortBy(sorter)
+
+ for(i ← 0 until expectedRes.productArity) {
+ val row = data(i)
+
+ if (row.size == 1)
+ assert(row(0) == expectedRes.productElement(i), s"row[$i, 0] = ${row(0)} should be equal ${expectedRes.productElement(i)}")
+ else {
+ val expectedRow: Product = expectedRes.productElement(i).asInstanceOf[Product]
+
+ assert(expectedRow.productArity == row.size, s"Rows size should be equal, but expected.size=${expectedRow.productArity} " +
+ s"and row.size=${row.size}")
+
+ for (j ← 0 until expectedRow.productArity)
+ assert(row(j) == expectedRow.productElement(j), s"row[$i, $j] = ${row(j)} should be equal ${expectedRow.productElement(j)}")
+ }
+ }
+ }
}
object AbstractDataFrameSpec {
@@ -135,7 +165,7 @@ object AbstractDataFrameSpec {
val PERSON_TBL_NAME_2 = "person2"
def configuration(igniteInstanceName: String, client: Boolean): IgniteConfiguration = {
- val cfg = IgnitionEx.loadConfiguration(TEST_CONFIG_FILE).get1()
+ val cfg = loadConfiguration(TEST_CONFIG_FILE).get1()
cfg.setClientMode(client)
@@ -167,6 +197,30 @@ object AbstractDataFrameSpec {
}
/**
+ * @param df Data frame.
+ * @param qry SQL Query.
+ */
+ def checkOptimizationResult(df: DataFrame, qry: String = ""): Unit = {
+ df.explain(true)
+
+ val plan = df.queryExecution.optimizedPlan
+
+ val cnt = plan.collectLeaves.count {
+ case LogicalRelation(relation: IgniteSQLAccumulatorRelation[_, _], _, _) ⇒
+ if (qry != "")
+ assert(qry.toLowerCase == relation.acc.compileQuery().toLowerCase,
+ s"Generated query should be equal to expected.\nexpected - $qry\ngenerated - ${relation.acc.compileQuery()}")
+
+ true
+
+ case _ ⇒
+ false
+ }
+
+ assert(cnt != 0, s"Plan should contains IgniteSQLAccumulatorRelation")
+ }
+
+ /**
* Enclose some closure, so it doesn't on outer object(default scala behaviour) while serializing.
*/
def enclose[E, R](enclosed: E)(func: E => R): R = func(enclosed)
http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteCatalogSpec.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteCatalogSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteCatalogSpec.scala
index 6077211..d87d234 100644
--- a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteCatalogSpec.scala
+++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteCatalogSpec.scala
@@ -58,7 +58,7 @@ class IgniteCatalogSpec extends AbstractDataFrameSpec {
it("Should provide ability to query SQL table without explicit registration") {
val res = igniteSession.sql("SELECT id, name FROM city").rdd
- res.count should equal(3)
+ res.count should equal(4)
val cities = res.collect.sortBy(_.getAs[JLong]("id"))
@@ -66,7 +66,8 @@ class IgniteCatalogSpec extends AbstractDataFrameSpec {
Array(
(1, "Forest Hill"),
(2, "Denver"),
- (3, "St. Petersburg")
+ (3, "St. Petersburg"),
+ (4, "St. Petersburg")
)
)
}
@@ -136,7 +137,7 @@ class IgniteCatalogSpec extends AbstractDataFrameSpec {
createEmployeeCache(client, EMPLOYEE_CACHE_NAME)
- val configProvider = enclose(null) (x ⇒ () ⇒ {
+ val configProvider = enclose(null) (_ ⇒ () ⇒ {
val cfg = IgnitionEx.loadConfiguration(TEST_CONFIG_FILE).get1()
cfg.setClientMode(true)
http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSchemaSpec.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSchemaSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSchemaSpec.scala
index cdd26cd..c5df901 100644
--- a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSchemaSpec.scala
+++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSchemaSpec.scala
@@ -17,9 +17,6 @@
package org.apache.ignite.spark
-import java.lang.{Integer ⇒ JInteger, String ⇒ JString}
-
-import org.apache.ignite.Ignite
import org.apache.ignite.spark.AbstractDataFrameSpec._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types._
@@ -45,7 +42,7 @@ class IgniteDataFrameSchemaSpec extends AbstractDataFrameSpec {
("IS_RESIDENT", BooleanType, true),
("SALARY", DoubleType, true),
("PENSION", DoubleType, true),
- ("ACCOUNT", DecimalType(10, 0), true),
+ ("ACCOUNT", IgniteRDD.DECIMAL, true),
("AGE", IntegerType, true),
("ID", LongType, false),
("CITY_ID", LongType, false))
http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSuite.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSuite.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSuite.scala
index 2ceb44a..b3f7026 100644
--- a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSuite.scala
+++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSuite.scala
@@ -28,5 +28,12 @@ class IgniteDataFrameSuite extends Suites (
new IgniteSQLDataFrameWriteSpec,
new IgniteSQLDataFrameIgniteSessionWriteSpec,
new IgniteDataFrameWrongConfigSpec,
- new IgniteCatalogSpec
+ new IgniteCatalogSpec,
+ new IgniteOptimizationSpec,
+ new IgniteOptimizationStringFuncSpec,
+ new IgniteOptimizationMathFuncSpec,
+ new IgniteOptimizationAggregationFuncSpec,
+ new IgniteOptimizationSystemFuncSpec,
+ new IgniteOptimizationJoinSpec,
+ new IgniteOptimizationDisableEnableSpec
)
http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationAggregationFuncSpec.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationAggregationFuncSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationAggregationFuncSpec.scala
new file mode 100644
index 0000000..d2527c8
--- /dev/null
+++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationAggregationFuncSpec.scala
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark
+
+import org.apache.ignite.Ignite
+import org.apache.ignite.cache.query.SqlFieldsQuery
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import java.lang.{Double ⇒ JDouble, Long ⇒ JLong}
+
+import org.apache.ignite.internal.IgnitionEx
+import org.apache.ignite.spark.AbstractDataFrameSpec.{DEFAULT_CACHE, TEST_CONFIG_FILE, checkOptimizationResult, enclose}
+import org.apache.spark.sql.ignite.IgniteSparkSession
+
+/**
+ */
+@RunWith(classOf[JUnitRunner])
+class IgniteOptimizationAggregationFuncSpec extends AbstractDataFrameSpec {
+ var igniteSession: IgniteSparkSession = _
+
+ describe("Supported optimized aggregation functions") {
+ it("COUNT") {
+ val df = igniteSession.sql("SELECT count(*) FROM numbers")
+
+ checkOptimizationResult(df, "SELECT count(1) FROM numbers")
+
+ val data = Tuple1(21)
+
+ checkQueryData(df, data)
+ }
+
+ it("AVG - DECIMAL") {
+ //TODO: write me
+ }
+
+ it("AVG - DOUBLE") {
+ val df = igniteSession.sql("SELECT AVG(val) FROM numbers WHERE id <= 3")
+
+ checkOptimizationResult(df, "SELECT AVG(val) FROM numbers WHERE id IS NOT NULL and id <= 3")
+
+ val data = Tuple1(.5)
+
+ checkQueryData(df, data)
+ }
+
+ it("MIN - DOUBLE") {
+ val df = igniteSession.sql("SELECT MIN(val) FROM numbers")
+
+ checkOptimizationResult(df, "SELECT MIN(val) FROM numbers")
+
+ val data = Tuple1(-1.0)
+
+ checkQueryData(df, data)
+ }
+
+ it("MAX - DOUBLE") {
+ val df = igniteSession.sql("SELECT MAX(val) FROM numbers")
+
+ checkOptimizationResult(df, "SELECT MAX(val) FROM numbers")
+
+ val data = Tuple1(180.0)
+
+ checkQueryData(df, data)
+ }
+
+ it("SUM - DOUBLE") {
+ val df = igniteSession.sql("SELECT SUM(val) FROM numbers WHERE id <= 3")
+
+ checkOptimizationResult(df, "SELECT SUM(val) FROM numbers WHERE id IS NOT NULL and id <= 3")
+
+ val data = Tuple1(1.5)
+
+ checkQueryData(df, data)
+ }
+
+ it("SUM - DECIMAL - 1") {
+ val df = igniteSession.sql("SELECT SUM(decimal_val) FROM numbers WHERE id IN (18, 19, 20)")
+
+ checkOptimizationResult(df, "SELECT SUM(decimal_val) FROM numbers WHERE id IN (18, 19, 20)")
+
+ df.printSchema()
+
+ val data = Tuple1(new java.math.BigDecimal(10.5).setScale(3))
+
+ checkQueryData(df, data)
+ }
+
+ it("SUM - DECIMAL - 2") {
+ val df = igniteSession.sql("SELECT SUM(decimal_val) FROM numbers WHERE id IN (18, 19, 20, 21)")
+
+ checkOptimizationResult(df, "SELECT SUM(decimal_val) FROM numbers WHERE id IN (18, 19, 20, 21)")
+
+ val data = Tuple1(new java.math.BigDecimal(15).setScale(3))
+
+ checkQueryData(df, data)
+ }
+
+ it("SUM - LONG") {
+ val df = igniteSession.sql("SELECT SUM(int_val) FROM numbers WHERE id in (15, 16, 17)")
+
+ checkOptimizationResult(df, "SELECT CAST(SUM(int_val) AS BIGINT) as \"SUM(int_val)\" " +
+ "FROM numbers WHERE id in (15, 16, 17)")
+
+ val data = Tuple1(6L)
+
+ checkQueryData(df, data)
+ }
+ }
+
+ def createNumberTable(client: Ignite, cacheName: String): Unit = {
+ val cache = client.cache(cacheName)
+
+ cache.query(new SqlFieldsQuery(
+ """
+ | CREATE TABLE numbers (
+ | id LONG,
+ | val DOUBLE,
+ | int_val LONG,
+ | decimal_val DECIMAL(5, 5),
+ | PRIMARY KEY (id)) WITH "backups=1"
+ """.stripMargin)).getAll
+
+ var qry = new SqlFieldsQuery("INSERT INTO numbers (id, val) values (?, ?)")
+
+ cache.query(qry.setArgs(1L.asInstanceOf[JLong], .0.asInstanceOf[JDouble])).getAll
+ cache.query(qry.setArgs(2L.asInstanceOf[JLong], .5.asInstanceOf[JDouble])).getAll
+ cache.query(qry.setArgs(3L.asInstanceOf[JLong], 1.0.asInstanceOf[JDouble])).getAll
+ cache.query(qry.setArgs(4L.asInstanceOf[JLong], 2.0.asInstanceOf[JDouble])).getAll
+ cache.query(qry.setArgs(5L.asInstanceOf[JLong], 4.0.asInstanceOf[JDouble])).getAll
+ cache.query(qry.setArgs(6L.asInstanceOf[JLong], -0.5.asInstanceOf[JDouble])).getAll
+ cache.query(qry.setArgs(7L.asInstanceOf[JLong], -1.0.asInstanceOf[JDouble])).getAll
+ cache.query(qry.setArgs(8L.asInstanceOf[JLong], 42.0.asInstanceOf[JDouble])).getAll
+ cache.query(qry.setArgs(9L.asInstanceOf[JLong], .51.asInstanceOf[JDouble])).getAll
+ cache.query(qry.setArgs(10L.asInstanceOf[JLong], .49.asInstanceOf[JDouble])).getAll
+ cache.query(qry.setArgs(11L.asInstanceOf[JLong], 100.0.asInstanceOf[JDouble])).getAll
+ cache.query(qry.setArgs(12L.asInstanceOf[JLong], (Math.E*Math.E).asInstanceOf[JDouble])).getAll
+ cache.query(qry.setArgs(13L.asInstanceOf[JLong], Math.PI.asInstanceOf[JDouble])).getAll
+ cache.query(qry.setArgs(14L.asInstanceOf[JLong], 180.0.asInstanceOf[JDouble])).getAll
+
+ qry = new SqlFieldsQuery("INSERT INTO numbers (id, int_val) values (?, ?)")
+
+ cache.query(qry.setArgs(15L.asInstanceOf[JLong], 1L.asInstanceOf[JLong])).getAll
+ cache.query(qry.setArgs(16L.asInstanceOf[JLong], 2L.asInstanceOf[JLong])).getAll
+ cache.query(qry.setArgs(17L.asInstanceOf[JLong], 3L.asInstanceOf[JLong])).getAll
+
+ qry = new SqlFieldsQuery("INSERT INTO numbers (id, decimal_val) values (?, ?)")
+
+ cache.query(qry.setArgs(18L.asInstanceOf[JLong], new java.math.BigDecimal(2.5))).getAll
+ cache.query(qry.setArgs(19L.asInstanceOf[JLong], new java.math.BigDecimal(3.5))).getAll
+ cache.query(qry.setArgs(20L.asInstanceOf[JLong], new java.math.BigDecimal(4.5))).getAll
+ cache.query(qry.setArgs(21L.asInstanceOf[JLong], new java.math.BigDecimal(4.5))).getAll
+ }
+
+ override protected def beforeAll(): Unit = {
+ super.beforeAll()
+
+ createNumberTable(client, DEFAULT_CACHE)
+
+ val configProvider = enclose(null) (x ⇒ () ⇒ {
+ val cfg = IgnitionEx.loadConfiguration(TEST_CONFIG_FILE).get1()
+
+ cfg.setClientMode(true)
+
+ cfg.setIgniteInstanceName("client-2")
+
+ cfg
+ })
+
+ igniteSession = IgniteSparkSession.builder()
+ .config(spark.sparkContext.getConf)
+ .igniteConfigProvider(configProvider)
+ .getOrCreate()
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationDateFuncSpec.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationDateFuncSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationDateFuncSpec.scala
new file mode 100644
index 0000000..7912cd0
--- /dev/null
+++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationDateFuncSpec.scala
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark
+
+import org.apache.ignite.Ignite
+import org.apache.ignite.cache.query.SqlFieldsQuery
+import org.apache.ignite.internal.IgnitionEx
+import org.apache.ignite.spark.AbstractDataFrameSpec.{DEFAULT_CACHE, TEST_CONFIG_FILE, checkOptimizationResult, enclose}
+import org.apache.spark.sql.ignite.IgniteSparkSession
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import java.lang.{Long ⇒ JLong}
+import java.util.{Date ⇒ JDate}
+import java.text.SimpleDateFormat
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.TimeUnit.DAYS
+
+/**
+ */
+@RunWith(classOf[JUnitRunner])
+class IgniteOptimizationDateFuncSpec extends AbstractDataFrameSpec {
+ var igniteSession: IgniteSparkSession = _
+
+ val format = new SimpleDateFormat("dd.MM.yyyy HH:mm:ss")
+
+ describe("Supported optimized date functions") {
+ it(" - CURRENT_TIMESTAMP") {
+ val df = igniteSession.sql("SELECT id, CURRENT_TIMESTAMP() FROM dates WHERE id = 1")
+
+ checkOptimizationResult(df)
+
+ val data = df.rdd.collect
+
+ assert(data(0).getAs[JLong]("id") == 1L)
+
+ val date: JDate = data(0).getAs[JDate]("current_timestamp()")
+ val millisDiff = new JDate().getTime - date.getTime
+
+ assert(millisDiff <= 30000)
+ }
+
+ it(" - CURRENT_DATE") {
+ val df = igniteSession.sql("SELECT id, CURRENT_DATE() FROM dates WHERE id = 1")
+
+ checkOptimizationResult(df)
+
+ val data = df.rdd.collect
+
+ assert(data(0).getAs[JLong]("id") == 1L)
+
+ val date: JDate = data(0).getAs[JDate]("current_date()")
+ val dayDiff = DAYS.convert(new JDate().getTime - date.getTime, TimeUnit.MILLISECONDS)
+
+ assert(dayDiff <= 1)
+ }
+
+ it(" - HOUR") {
+ val df = igniteSession.sql("SELECT HOUR(val) FROM dates WHERE id = 1")
+
+ checkOptimizationResult(df)
+
+ val data = Tuple1(0)
+
+ checkQueryData(df, data)
+ }
+
+ it(" - MINUTE") {
+ val df = igniteSession.sql("SELECT MINUTE(val) FROM dates WHERE id = 1")
+
+ checkOptimizationResult(df)
+
+ val data = Tuple1(0)
+
+ checkQueryData(df, data)
+ }
+
+ it(" - SECOND") {
+ val df = igniteSession.sql("SELECT SECOND(val) FROM dates WHERE id = 1")
+
+ checkOptimizationResult(df)
+
+ val data = Tuple1(0)
+
+ checkQueryData(df, data)
+ }
+
+ it(" - MONTH") {
+ val df = igniteSession.sql("SELECT MONTH(val) FROM dates WHERE id = 1")
+
+ checkOptimizationResult(df)
+
+ val data = Tuple1(0)
+
+ checkQueryData(df, data)
+ }
+
+ it(" - YEAR") {
+ val df = igniteSession.sql("SELECT YEAR(val) FROM dates WHERE id = 1")
+
+ checkOptimizationResult(df)
+
+ val data = Tuple1(2017)
+
+ checkQueryData(df, data)
+ }
+
+ it(" - QUARTER") {
+ val df = igniteSession.sql("SELECT QUARTER(val) FROM dates WHERE id = 1")
+
+ checkOptimizationResult(df)
+
+ val data = Tuple1(1)
+
+ checkQueryData(df, data)
+ }
+
+ it(" - WEEK") {
+ val df = igniteSession.sql("SELECT WEEKOFYEAR(val) FROM dates WHERE id = 1")
+
+ checkOptimizationResult(df)
+
+ val data = Tuple1(1)
+
+ checkQueryData(df, data)
+ }
+
+ it(" - DAY_OF_MONTH") {
+ val df = igniteSession.sql("SELECT DAYOFMONTH(val) FROM dates WHERE id = 1")
+
+ checkOptimizationResult(df)
+
+ val data = Tuple1(1)
+
+ checkQueryData(df, data)
+ }
+
+ it(" - DAY_OF_YEAR") {
+ val df = igniteSession.sql("SELECT DAYOFYEAR(val) FROM dates WHERE id = 1")
+
+ checkOptimizationResult(df)
+
+ val data = Tuple1(1)
+
+ checkQueryData(df, data)
+ }
+
+ it(" - DATE_ADD") {
+ val df = igniteSession.sql("SELECT DATE_ADD(val, 2) FROM dates WHERE id = 1")
+
+ checkOptimizationResult(df)
+
+ val data = Tuple1(format.parse("03.01.2017 00:00:00"))
+
+ checkQueryData(df, data)
+ }
+
+ it(" - DATEDIFF") {
+ val df = igniteSession.sql("SELECT " +
+ "DATEDIFF(val, TO_DATE('2017-01-02 00:00:00.000', 'yyyy-MM-dd HH:mm:ss.SSS')) FROM dates WHERE id = 1")
+
+ checkOptimizationResult(df)
+
+ val data = Tuple1(1)
+
+ checkQueryData(df, data)
+ }
+
+ it(" - FORMATDATETIME") {
+ val df = igniteSession.sql("SELECT DATE_FORMAT(val, 'yyyy-MM-dd HH:mm:ss.SSS') FROM dates WHERE id = 1")
+
+ checkOptimizationResult(df)
+
+ val data = Tuple1("2017-01-01 00:00:00.000")
+
+ checkQueryData(df, data)
+ }
+ }
+
+ def createDateTable(client: Ignite, cacheName: String): Unit = {
+ val cache = client.cache(cacheName)
+
+ cache.query(new SqlFieldsQuery(
+ """
+ | CREATE TABLE dates (
+ | id LONG,
+ | val DATE,
+ | PRIMARY KEY (id)) WITH "backups=1"
+ """.stripMargin)).getAll
+
+ val qry = new SqlFieldsQuery("INSERT INTO dates(id, val) values (?, ?)")
+
+ cache.query(qry.setArgs(1L.asInstanceOf[JLong], format.parse("01.01.2017 00:00:00"))).getAll
+ }
+
+ override protected def beforeAll(): Unit = {
+ super.beforeAll()
+
+ createDateTable(client, DEFAULT_CACHE)
+
+ val configProvider = enclose(null) (x ⇒ () ⇒ {
+ val cfg = IgnitionEx.loadConfiguration(TEST_CONFIG_FILE).get1()
+
+ cfg.setClientMode(true)
+
+ cfg.setIgniteInstanceName("client-2")
+
+ cfg
+ })
+
+ igniteSession = IgniteSparkSession.builder()
+ .config(spark.sparkContext.getConf)
+ .igniteConfigProvider(configProvider)
+ .getOrCreate()
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationDisableEnableSpec.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationDisableEnableSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationDisableEnableSpec.scala
new file mode 100644
index 0000000..033af74
--- /dev/null
+++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationDisableEnableSpec.scala
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark
+
+import org.apache.ignite.spark.AbstractDataFrameSpec.TEST_CONFIG_FILE
+import org.apache.ignite.spark.IgniteDataFrameSettings._
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.ignite.IgniteOptimization
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+
+/**
+ */
+@RunWith(classOf[JUnitRunner])
+class IgniteOptimizationDisableEnableSpec extends AbstractDataFrameSpec {
+ var personDataFrame: DataFrame = _
+
+ describe("Ignite Optimization Disabling/Enabling") {
+ it("should add Ignite Optimization to a session on a first query") {
+ if (spark.sparkContext.isStopped)
+ createSparkSession()
+
+ assert(!igniteOptimizationExists(spark), "Session shouldn't contains IgniteOptimization")
+
+ personDataFrame = spark.read
+ .format(FORMAT_IGNITE)
+ .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
+ .option(OPTION_TABLE, "person")
+ .load()
+
+ assert(igniteOptimizationExists(spark),
+ "Session should contains IgniteOptimization after executing query over Ignite Data Frame")
+
+ spark.stop()
+ }
+
+ it("should remove Ignite Optimization if it disabled at runtime") {
+ if (!spark.sparkContext.isStopped)
+ spark.stop()
+
+ val newSession = SparkSession.builder()
+ .appName("Ignite Optimization check")
+ .master("local")
+ .config("spark.executor.instances", "2")
+ .getOrCreate()
+
+ assert(!igniteOptimizationExists(newSession), "Session shouldn't contains IgniteOptimization")
+
+ var newPersonDataFrame = newSession.read
+ .format(FORMAT_IGNITE)
+ .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
+ .option(OPTION_TABLE, "person")
+ .load()
+
+ assert(igniteOptimizationExists(newSession),
+ "Session should contains IgniteOptimization after executing query over Ignite Data Frame")
+
+
+ newSession.conf.set(OPTION_DISABLE_SPARK_SQL_OPTIMIZATION, "true")
+
+ newPersonDataFrame = newSession.read
+ .format(FORMAT_IGNITE)
+ .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
+ .option(OPTION_TABLE, "person")
+ .load()
+
+ assert(!igniteOptimizationExists(newSession),
+ "Session shouldn't contains IgniteOptimization")
+
+ newSession.close()
+ }
+
+ it("shouldn't add Ignite Optimization to a session when it's disabled") {
+ if (!spark.sparkContext.isStopped)
+ spark.stop()
+
+ val newSession = SparkSession.builder()
+ .appName("Ignite Optimization check")
+ .master("local")
+ .config("spark.executor.instances", "2")
+ .config(OPTION_DISABLE_SPARK_SQL_OPTIMIZATION, "true")
+ .getOrCreate()
+
+ assert(!igniteOptimizationExists(newSession), "Session shouldn't contains IgniteOptimization")
+
+ val newPersonDataFrame = newSession.read
+ .format(FORMAT_IGNITE)
+ .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
+ .option(OPTION_TABLE, "person")
+ .load()
+
+ newPersonDataFrame.createOrReplaceTempView("person")
+
+ val res = newSession.sqlContext.sql("SELECT name FROM person WHERE id = 2").rdd
+
+ res.count should equal(1)
+
+ assert(!igniteOptimizationExists(newSession), "Session shouldn't contains IgniteOptimization")
+
+ newSession.close()
+ }
+ }
+
+ def igniteOptimizationExists(session: SparkSession): Boolean =
+ session.sessionState.experimentalMethods.extraOptimizations.contains(IgniteOptimization)
+
+ override protected def beforeAll(): Unit = {
+ super.beforeAll()
+
+ createPersonTable(client, "cache1")
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationJoinSpec.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationJoinSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationJoinSpec.scala
new file mode 100644
index 0000000..b4b36a8
--- /dev/null
+++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationJoinSpec.scala
@@ -0,0 +1,543 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark
+
+import org.apache.ignite.Ignite
+import org.apache.ignite.cache.query.SqlFieldsQuery
+import org.apache.ignite.internal.IgnitionEx
+import org.apache.ignite.spark.AbstractDataFrameSpec.{DEFAULT_CACHE, TEST_CONFIG_FILE, checkOptimizationResult, enclose}
+import org.apache.spark.sql.ignite.IgniteSparkSession
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+
+import java.lang.{Long ⇒ JLong}
+
+/**
+ */
+@RunWith(classOf[JUnitRunner])
+class IgniteOptimizationJoinSpec extends AbstractDataFrameSpec {
+ var igniteSession: IgniteSparkSession = _
+
+ describe("Optimized join queries") {
+ it("UNION") {
+ val qry =
+ """
+ | SELECT id, val1 as val FROM jt1 UNION
+ | SELECT id, val2 as val FROM jt2 UNION
+ | SELECT id, val3 as val FROM jt3
+ |""".stripMargin
+
+ val df = igniteSession.sql(qry)
+
+ checkOptimizationResult(df,
+ "SELECT id, val FROM (SELECT id, val1 as val FROM jt1 UNION " +
+ "SELECT id, val2 as val FROM jt2 UNION " +
+ "SELECT id, val3 as val FROM jt3) table1")
+
+ val data = (
+ (1L, "A"),
+ (1L, "B"),
+ (2L, "B"),
+ (2L, "C"),
+ (2L, "D"),
+ (3L, "C"),
+ (3L, "D"),
+ (3L, "E"))
+
+ checkQueryData(df, data, row ⇒ (row.getAs[JLong](0), row.getAs[String](1)))
+ }
+
+ it("UNION ALL") {
+ val qry =
+ """
+ | SELECT id, val1 as val FROM jt1 UNION ALL
+ | SELECT id, val2 as val FROM jt2 UNION ALL
+ | SELECT id, val3 as val FROM jt3
+ |""".stripMargin
+
+ val df = igniteSession.sql(qry)
+
+ checkOptimizationResult(df,
+ "SELECT id, val1 as val FROM jt1 UNION " +
+ "SELECT id, val2 as val FROM jt2 UNION " +
+ "SELECT id, val3 as val FROM jt3")
+
+ val data = (
+ (1L, "A"),
+ (1L, "B"),
+ (2L, "B"),
+ (2L, "C"),
+ (2L, "D"),
+ (3L, "C"),
+ (3L, "D"),
+ (3L, "E"))
+
+ checkQueryData(df, data, row ⇒ (row.getAs[JLong](0), row.getAs[String](1)))
+ }
+
+ it("UNION ALL ORDER") {
+ val qry =
+ """
+ | SELECT id, val1 as val FROM jt1 UNION ALL
+ | SELECT id, val2 as val FROM jt2 UNION ALL
+ | SELECT id, val3 as val FROM jt3
+ | ORDER BY id DESC, val
+ |""".stripMargin
+
+ val df = igniteSession.sql(qry)
+
+ checkOptimizationResult(df,
+ "SELECT id, val1 as val FROM jt1 UNION " +
+ "SELECT id, val2 as val FROM jt2 UNION " +
+ "SELECT id, val3 as val FROM jt3 " +
+ "ORDER BY id DESC, val")
+
+ val data = (
+ (3L, "C"),
+ (3L, "D"),
+ (3L, "E"),
+ (2L, "B"),
+ (2L, "C"),
+ (2L, "D"),
+ (1L, "A"),
+ (1L, "B")
+ )
+
+ checkQueryData(df, data, _ ⇒ 0)
+ }
+
+ it("UNION WITH AGGREGATE") {
+ val qry =
+ """
+ | SELECT VAL, COUNT(*) FROM (
+ | SELECT id, val1 as val FROM jt1 UNION
+ | SELECT id, val2 as val FROM jt2 UNION
+ | SELECT id, val3 as val FROM jt3 ) t1
+ | GROUP BY val HAVING COUNT(*) > 1
+ |""".stripMargin
+
+ val df = igniteSession.sql(qry)
+
+ checkOptimizationResult(df,
+ "SELECT VAL, count(1) FROM (" +
+ "SELECT id, val1 AS val FROM JT1 UNION " +
+ "SELECT id, val2 AS val FROM JT2 UNION " +
+ "SELECT id, val3 AS val FROM JT3" +
+ ") table1 GROUP BY val HAVING count(1) > 1")
+
+ val data = (
+ ("B", 2L),
+ ("C", 2L),
+ ("D", 2L)
+ )
+
+ checkQueryData(df, data)
+ }
+
+ it("AGGREGATE ON AGGREGATE RESULT") {
+ val qry =
+ """
+ | SELECT SUM(cnt) FROM (
+ | SELECT VAL, COUNT(*) as CNT FROM (
+ | SELECT id, val1 as val FROM jt1 UNION
+ | SELECT id, val2 as val FROM jt2 UNION
+ | SELECT id, val3 as val FROM jt3 ) t1
+ | GROUP BY val HAVING COUNT(*) > 1
+ | ) t1
+ |""".stripMargin
+
+ val df = igniteSession.sql(qry)
+
+ checkOptimizationResult(df,
+ "SELECT CAST(SUM(cnt) as BIGINT) as \"SUM(cnt)\" FROM (" +
+ "SELECT count(1) as cnt FROM (" +
+ "SELECT id, val1 as val FROM jt1 UNION " +
+ "SELECT id, val2 as val FROM jt2 UNION " +
+ "SELECT id, val3 as val FROM jt3" +
+ ") table1 GROUP BY val HAVING count(1) > 1) table2")
+
+ val data = Tuple1(6.0)
+
+ checkQueryData(df, data)
+ }
+
+ it("SELF INNER JOIN") {
+ val qry =
+ """
+ |SELECT
+ | jt1.id,
+ | jt1.val1,
+ | jt2.id,
+ | jt2.val1
+ |FROM
+ | jt1 JOIN
+ | jt1 as jt2 ON jt1.val1 = jt2.val1
+ |""".stripMargin
+
+ val df = igniteSession.sql(qry)
+
+ checkOptimizationResult(df, "SELECT JT1.ID, JT1.VAL1, table1.ID, table1.VAL1 " +
+ "FROM JT1 JOIN JT1 AS table1 ON jt1.val1 = table1.val1 " +
+ "WHERE jt1.val1 IS NOT NULL AND table1.val1 IS NOT NULL")
+
+ val data = (
+ (1, "A", 1, "A"),
+ (2, "B", 2, "B"),
+ (3, "C", 3, "C")
+ )
+
+ checkQueryData(df, data)
+ }
+
+
+ it("SELF INNER JOIN WITH WHERE") {
+ val qry =
+ """
+ |SELECT
+ | jt1.id,
+ | jt1.val1,
+ | jt2.id,
+ | jt2.val1
+ |FROM
+ | jt1 JOIN
+ | jt1 as jt2 ON jt1.val1 = jt2.val1
+ |WHERE jt2.val1 = 'A'
+ |""".stripMargin
+
+ val df = igniteSession.sql(qry)
+
+/* checkOptimizationResult(df, "SELECT JT1.ID, JT1.VAL1, table1.ID, table1.VAL1 " +
+ "FROM JT1 JOIN JT1 as table1 ON JT1.val1 = table1.val1 " +
+ "WHERE JT1.val1 = 'A' AND JT1.val1 IS NOT NULL AND table1.val1 IS NOT NULL AND table1.val1 = 'A'")*/
+
+ val data = Tuple1(
+ (1, "A", 1, "A")
+ )
+
+ checkQueryData(df, data)
+ }
+
+
+ it("INNER JOIN") {
+ val qry =
+ """
+ |SELECT
+ | jt1.id as id1,
+ | jt1.val1,
+ | jt2.id as id2,
+ | jt2.val2
+ |FROM
+ | jt1 JOIN
+ | jt2 ON jt1.val1 = jt2.val2
+ |""".stripMargin
+
+ val df = igniteSession.sql(qry)
+
+ checkOptimizationResult(df, "SELECT JT1.ID AS id1, JT1.VAL1, JT2.ID AS id2, JT2.VAL2 " +
+ "FROM JT1 JOIN JT2 ON jt1.val1 = jt2.val2 " +
+ "WHERE jt1.val1 IS NOT NULL AND jt2.val2 IS NOT NULL")
+
+ val data = (
+ (2, "B", 1, "B"),
+ (3, "C", 2, "C")
+ )
+
+ checkQueryData(df, data)
+ }
+
+ it("INNER JOIN WITH WHERE") {
+ val qry =
+ """
+ |SELECT
+ | jt1.id as id1,
+ | jt1.val1,
+ | jt2.id as id2,
+ | jt2.val2
+ |FROM
+ | jt1 JOIN
+ | jt2 ON jt1.val1 = jt2.val2
+ |WHERE
+ | jt1.id < 10
+ |""".stripMargin
+
+ val df = igniteSession.sql(qry)
+
+ checkOptimizationResult(df, "SELECT jt1.id as id1, jt1.val1, jt2.id as id2, jt2.val2 " +
+ "FROM jt1 JOIN jt2 ON jt1.val1 = jt2.val2 " +
+ "WHERE jt1.id IS NOT NULL AND jt1.id < 10 AND jt1.val1 IS NOT NULL and jt2.val2 IS NOT NULL")
+
+ val data = (
+ (2, "B", 1, "B"),
+ (3, "C", 2, "C")
+ )
+
+ checkQueryData(df, data)
+ }
+
+ it("LEFT JOIN") {
+ val qry =
+ """
+ |SELECT
+ | jt1.id as id1,
+ | jt1.val1,
+ | jt2.id as id2,
+ | jt2.val2
+ |FROM
+ | jt1 LEFT JOIN
+ | jt2 ON jt1.val1 = jt2.val2
+ |""".stripMargin
+
+ val df = igniteSession.sql(qry)
+
+ checkOptimizationResult(df, "SELECT jt1.id as id1, jt1.val1, jt2.id as id2, jt2.val2 " +
+ "FROM jt1 LEFT JOIN jt2 ON jt1.val1 = jt2.val2")
+
+ val data = (
+ (1, "A", null, null),
+ (2, "B", 1, "B"),
+ (3, "C", 2, "C")
+ )
+
+ checkQueryData(df, data)
+ }
+
+ it("RIGHT JOIN") {
+ val qry =
+ """
+ |SELECT
+ | jt1.id as id1,
+ | jt1.val1,
+ | jt2.id as id2,
+ | jt2.val2
+ |FROM
+ | jt1 RIGHT JOIN
+ | jt2 ON jt1.val1 = jt2.val2
+ |""".stripMargin
+
+ val df = igniteSession.sql(qry)
+
+ checkOptimizationResult(df, "SELECT jt1.id as id1, jt1.val1, jt2.id as id2, jt2.val2 " +
+ "FROM jt1 RIGHT JOIN jt2 ON jt1.val1 = jt2.val2")
+
+ val data = (
+ (2, "B", 1, "B"),
+ (3, "C", 2, "C"),
+ (null, null, 3, "D")
+ )
+
+ checkQueryData(df, data, r ⇒ if (r.get(0) == null) 100L else r.getAs[Long](0))
+ }
+
+ it("JOIN 3 TABLE") {
+ val qry =
+ """
+ |SELECT
+ | jt1.id as id1,
+ | jt1.val1 as val1,
+ | jt2.id as id2,
+ | jt2.val2 as val2,
+ | jt3.id as id3,
+ | jt3.val3 as val3
+ |FROM
+ | jt1 LEFT JOIN
+ | jt2 ON jt1.val1 = jt2.val2 LEFT JOIN
+ | jt3 ON jt1.val1 = jt3.val3
+ |""".stripMargin
+
+ val df = igniteSession.sql(qry)
+
+ checkOptimizationResult(df,
+ "SELECT table1.id as id1, table1.val1, table1.id_2 as id2, table1.val2, jt3.id as id3, jt3.val3 " +
+ "FROM (" +
+ "SELECT jt1.val1, jt1.id, jt2.val2, jt2.id as id_2 " +
+ "FROM JT1 LEFT JOIN jt2 ON jt1.val1 = jt2.val2) table1 LEFT JOIN " +
+ "jt3 ON table1.val1 = jt3.val3")
+
+ val data = (
+ (1, "A", null, null, 1, "A"),
+ (2, "B", 1, "B", null, null),
+ (3, "C", 2, "C", null, null))
+
+ checkQueryData(df, data)
+ }
+
+ it("JOIN 3 TABLE AND AGGREGATE") {
+ val qry =
+ """
+ |SELECT SUM(id1) FROM (
+ | SELECT
+ | jt1.id as id1,
+ | jt1.val1 as val1,
+ | jt2.id as id2,
+ | jt2.val2 as val2,
+ | jt3.id as id3,
+ | jt3.val3 as val3
+ |FROM
+ | jt1 LEFT JOIN
+ | jt2 ON jt1.val1 = jt2.val2 LEFT JOIN
+ | jt3 ON jt1.val1 = jt3.val3
+ |) WHERE CONCAT(val1, val2) = 'BB' OR CONCAT(val1, val3) = 'AA'
+ |""".stripMargin
+
+ val df = igniteSession.sql(qry)
+
+ checkOptimizationResult(df,
+ "SELECT CAST(SUM(table1.ID) AS BIGINT) AS \"sum(id1)\" FROM " +
+ "(SELECT JT1.VAL1, JT1.ID, JT2.VAL2 FROM JT1 LEFT JOIN JT2 ON JT1.val1 = JT2.val2) table1 LEFT JOIN " +
+ "JT3 ON table1.val1 = JT3.val3 " +
+ "WHERE CONCAT(table1.val1, table1.val2) = 'BB' OR CONCAT(table1.val1, JT3.val3) = 'AA'")
+
+ val data = Tuple1(3)
+
+ checkQueryData(df, data, _ ⇒ 0)
+ }
+
+ it("INNER JOIN SUBQUERY") {
+ val qry =
+ """
+ |SELECT sum_id, val1, val2 FROM (
+ | SELECT
+ | jt1.id + jt2.id as sum_id,
+ | jt1.val1 as val1,
+ | jt2.val2 as val2
+ | FROM
+ | jt1 JOIN
+ | jt2 ON jt1.val1 = jt2.val2
+ |) t1 WHERE sum_id != 15
+ |""".stripMargin
+
+ val df = igniteSession.sql(qry)
+
+ checkOptimizationResult(df,
+ "SELECT jt1.id + jt2.id as sum_id, jt1.val1, jt2.val2 FROM " +
+ "jt1 JOIN jt2 ON NOT jt1.id + jt2.id = 15 AND jt1.val1 = jt2.val2 " +
+ "WHERE " +
+ "jt1.id IS NOT NULL AND " +
+ "jt1.val1 IS NOT NULL AND " +
+ "jt2.id IS NOT NULL AND " +
+ "jt2.val2 IS NOT NULL"
+ )
+
+ val data = (
+ (3, "B", "B"),
+ (5, "C", "C")
+ )
+
+ checkQueryData(df, data)
+ }
+
+ it("INNER JOIN SUBQUERY - 2") {
+ val qry =
+ """
+ |SELECT SUM(sum_id) FROM (
+ | SELECT
+ | jt1.id + jt2.id as sum_id
+ | FROM
+ | jt1 JOIN
+ | jt2 ON jt1.val1 = jt2.val2
+ |) t1 WHERE sum_id != 15
+ |""".stripMargin
+
+ val df = igniteSession.sql(qry)
+
+ checkOptimizationResult(df,
+ "SELECT CAST(SUM(JT1.ID + JT2.ID) AS BIGINT) AS \"sum(sum_id)\" " +
+ "FROM JT1 JOIN JT2 ON NOT JT1.id + JT2.id = 15 AND JT1.val1 = JT2.val2 " +
+ "WHERE JT1.id IS NOT NULL AND JT1.val1 IS NOT NULL AND JT2.id IS NOT NULL AND JT2.val2 IS NOT NULL")
+
+ val data = Tuple1(8)
+
+ checkQueryData(df, data)
+ }
+ }
+
+ def createJoinedTables(client: Ignite, cacheName: String): Unit = {
+ val cache = client.cache(cacheName)
+
+ cache.query(new SqlFieldsQuery(
+ """
+ | CREATE TABLE jt1 (
+ | id LONG,
+ | val1 VARCHAR,
+ | PRIMARY KEY (id)) WITH "backups=1"
+ """.stripMargin)).getAll
+
+ cache.query(new SqlFieldsQuery(
+ """
+ | CREATE TABLE jt2 (
+ | id LONG,
+ | val2 VARCHAR,
+ | PRIMARY KEY (id)) WITH "backups=1"
+ """.stripMargin)).getAll
+
+ cache.query(new SqlFieldsQuery(
+ """
+ | CREATE TABLE jt3 (
+ | id LONG,
+ | val3 VARCHAR,
+ | PRIMARY KEY (id)) WITH "backups=1"
+ """.stripMargin)).getAll
+
+ var qry = new SqlFieldsQuery("INSERT INTO jt1 (id, val1) values (?, ?)")
+
+ cache.query(qry.setArgs(1L.asInstanceOf[JLong], "A")).getAll
+ cache.query(qry.setArgs(2L.asInstanceOf[JLong], "B")).getAll
+ cache.query(qry.setArgs(3L.asInstanceOf[JLong], "C")).getAll
+
+ qry = new SqlFieldsQuery("INSERT INTO jt2 (id, val2) values (?, ?)")
+
+ cache.query(qry.setArgs(1L.asInstanceOf[JLong], "B")).getAll
+ cache.query(qry.setArgs(2L.asInstanceOf[JLong], "C")).getAll
+ cache.query(qry.setArgs(3L.asInstanceOf[JLong], "D")).getAll
+
+ qry = new SqlFieldsQuery("INSERT INTO jt3 (id, val3) values (?, ?)")
+
+ cache.query(qry.setArgs(1L.asInstanceOf[JLong], "A")).getAll
+ cache.query(qry.setArgs(2L.asInstanceOf[JLong], "D")).getAll
+ cache.query(qry.setArgs(3L.asInstanceOf[JLong], "E")).getAll
+
+ cache.query(new SqlFieldsQuery("CREATE INDEX idx1 ON jt1(val1)")).getAll
+ cache.query(new SqlFieldsQuery("CREATE INDEX idx2 ON jt2(val2)")).getAll
+ cache.query(new SqlFieldsQuery("CREATE INDEX idx3 ON jt3(val3)")).getAll
+ }
+
+ override protected def beforeAll(): Unit = {
+ super.beforeAll()
+
+ createPersonTable(client, DEFAULT_CACHE)
+
+ createCityTable(client, DEFAULT_CACHE)
+
+ createJoinedTables(client, DEFAULT_CACHE)
+
+ val configProvider = enclose(null) (x ⇒ () ⇒ {
+ val cfg = IgnitionEx.loadConfiguration(TEST_CONFIG_FILE).get1()
+
+ cfg.setClientMode(true)
+
+ cfg.setIgniteInstanceName("client-2")
+
+ cfg
+ })
+
+ igniteSession = IgniteSparkSession.builder()
+ .config(spark.sparkContext.getConf)
+ .igniteConfigProvider(configProvider)
+ .getOrCreate()
+ }
+}