You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2018/04/20 08:59:34 UTC
[3/3] ignite git commit: IGNITE-7077: Implementation of Spark query
optimization. - Fixes #3397.
IGNITE-7077: Implementation of Spark query optimization. - Fixes #3397.
Signed-off-by: Nikolay Izhikov <ni...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/10a4c48b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/10a4c48b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/10a4c48b
Branch: refs/heads/master
Commit: 10a4c48bf722f49e7eca09c50cf4af6c2ec59b1b
Parents: 8008601
Author: Nikolay Izhikov <ni...@apache.org>
Authored: Fri Apr 20 11:23:42 2018 +0300
Committer: Nikolay Izhikov <ni...@apache.org>
Committed: Fri Apr 20 11:23:42 2018 +0300
----------------------------------------------------------------------
.../ignite/spark/IgniteDataFrameSettings.scala | 9 +
.../org/apache/ignite/spark/IgniteRDD.scala | 7 +-
.../spark/impl/IgniteRelationProvider.scala | 20 +-
.../impl/IgniteSQLAccumulatorRelation.scala | 98 ++++
.../spark/impl/IgniteSQLDataFrameRDD.scala | 16 +-
.../ignite/spark/impl/IgniteSQLRelation.scala | 67 +--
.../optimization/AggregateExpressions.scala | 114 ++++
.../optimization/ConditionExpressions.scala | 160 ++++++
.../impl/optimization/DateExpressions.scala | 127 +++++
.../impl/optimization/IgniteQueryContext.scala | 52 ++
.../impl/optimization/MathExpressions.scala | 263 +++++++++
.../impl/optimization/SimpleExpressions.scala | 180 ++++++
.../impl/optimization/StringExpressions.scala | 154 ++++++
.../optimization/SupportedExpressions.scala | 42 ++
.../impl/optimization/SystemExpressions.scala | 122 +++++
.../accumulator/JoinSQLAccumulator.scala | 222 ++++++++
.../accumulator/QueryAccumulator.scala | 70 +++
.../accumulator/SelectAccumulator.scala | 70 +++
.../accumulator/SingleTableSQLAccumulator.scala | 124 +++++
.../accumulator/UnionSQLAccumulator.scala | 63 +++
.../spark/impl/optimization/package.scala | 230 ++++++++
.../org/apache/ignite/spark/impl/package.scala | 48 +-
.../spark/sql/ignite/IgniteOptimization.scala | 436 +++++++++++++++
.../spark/sql/ignite/IgniteSparkSession.scala | 10 +-
.../ignite/spark/AbstractDataFrameSpec.scala | 68 ++-
.../apache/ignite/spark/IgniteCatalogSpec.scala | 7 +-
.../spark/IgniteDataFrameSchemaSpec.scala | 5 +-
.../ignite/spark/IgniteDataFrameSuite.scala | 9 +-
.../IgniteOptimizationAggregationFuncSpec.scala | 189 +++++++
.../spark/IgniteOptimizationDateFuncSpec.scala | 230 ++++++++
.../IgniteOptimizationDisableEnableSpec.scala | 127 +++++
.../spark/IgniteOptimizationJoinSpec.scala | 543 +++++++++++++++++++
.../spark/IgniteOptimizationMathFuncSpec.scala | 358 ++++++++++++
.../ignite/spark/IgniteOptimizationSpec.scala | 305 +++++++++++
.../IgniteOptimizationStringFuncSpec.scala | 313 +++++++++++
.../IgniteOptimizationSystemFuncSpec.scala | 147 +++++
36 files changed, 4924 insertions(+), 81 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteDataFrameSettings.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteDataFrameSettings.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteDataFrameSettings.scala
index 6bff476..9daaec4 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteDataFrameSettings.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteDataFrameSettings.scala
@@ -153,4 +153,13 @@ object IgniteDataFrameSettings {
* @see [[org.apache.ignite.IgniteDataStreamer#perNodeParallelOperations(int)]]
*/
val OPTION_STREAMER_PER_NODE_PARALLEL_OPERATIONS = "streamerPerNodeParallelOperations"
+
+ /**
+ * Option for a [[org.apache.spark.sql.SparkSession]] configuration.
+ * If `true` then all Ignite optimization of Spark SQL statements will be disabled.
+ * Default value is `false`.
+ *
+ * @see [[org.apache.spark.sql.ignite.IgniteOptimization]]
+ */
+ val OPTION_DISABLE_SPARK_SQL_OPTIMIZATION = "ignite.disableSparkSQLOptimization"
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
index d87ea0a..5fb81b6 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
@@ -344,6 +344,11 @@ class IgniteRDD[K, V] (
object IgniteRDD {
/**
+ * Default decimal type.
+ */
+ private[spark] val DECIMAL = DecimalType(DecimalType.MAX_PRECISION, 3)
+
+ /**
* Gets Spark data type based on type name.
*
* @param typeName Type name.
@@ -357,7 +362,7 @@ object IgniteRDD {
case "java.lang.Long" ⇒ LongType
case "java.lang.Float" ⇒ FloatType
case "java.lang.Double" ⇒ DoubleType
- case "java.math.BigDecimal" ⇒ DataTypes.createDecimalType()
+ case "java.math.BigDecimal" ⇒ DECIMAL
case "java.lang.String" ⇒ StringType
case "java.util.Date" ⇒ DateType
case "java.sql.Date" ⇒ DateType
http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteRelationProvider.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteRelationProvider.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteRelationProvider.scala
index a9f9f89..e4fa9f7 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteRelationProvider.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteRelationProvider.scala
@@ -26,6 +26,7 @@ import org.apache.ignite.spark.IgniteDataFrameSettings._
import org.apache.ignite.spark.impl.QueryHelper.{createTable, dropTable, ensureCreateTableOptions, saveTable}
import org.apache.spark.sql.SaveMode.{Append, Overwrite}
import org.apache.spark.sql.ignite.IgniteExternalCatalog.{IGNITE_PROTOCOL, OPTION_GRID}
+import org.apache.spark.sql.ignite.IgniteOptimization
import org.apache.spark.sql.sources._
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
@@ -176,11 +177,28 @@ class IgniteRelationProvider extends RelationProvider
* @param sqlCtx SQL context.
* @return Ignite SQL relation.
*/
- private def createRelation(igniteCtx: IgniteContext, tblName: String, sqlCtx: SQLContext): BaseRelation =
+ private def createRelation(igniteCtx: IgniteContext, tblName: String, sqlCtx: SQLContext): BaseRelation = {
+ val optimizationDisabled =
+ sqlCtx.sparkSession.conf.get(OPTION_DISABLE_SPARK_SQL_OPTIMIZATION, "false").toBoolean
+
+ val experimentalMethods = sqlCtx.sparkSession.sessionState.experimentalMethods
+
+ if (optimizationDisabled) {
+ experimentalMethods.extraOptimizations =
+ experimentalMethods.extraOptimizations.filter(_ != IgniteOptimization)
+ }
+ else {
+ val optimizationExists = experimentalMethods.extraOptimizations.contains(IgniteOptimization)
+
+ if (!optimizationExists)
+ experimentalMethods.extraOptimizations = experimentalMethods.extraOptimizations :+ IgniteOptimization
+ }
+
IgniteSQLRelation(
igniteCtx,
tblName,
sqlCtx)
+ }
/**
* @param params Params.
http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLAccumulatorRelation.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLAccumulatorRelation.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLAccumulatorRelation.scala
new file mode 100644
index 0000000..6eb600a
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLAccumulatorRelation.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.impl
+
+import org.apache.ignite.spark.impl
+import org.apache.ignite.spark.impl.optimization.accumulator.{JoinSQLAccumulator, QueryAccumulator}
+import org.apache.ignite.spark.impl.optimization.isSimpleTableAcc
+import org.apache.spark.Partition
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, TableScan}
+import org.apache.spark.sql.types.{Metadata, StructField, StructType}
+
+/**
+ * Relation to query data from query generated by <code>QueryAccumulator</code>.
+ * <code>QueryAccumulator</code> is generated by <code>IgniteOptimization</code>.
+ *
+ * @see IgniteOptimization
+ */
+class IgniteSQLAccumulatorRelation[K, V](val acc: QueryAccumulator)
+ (@transient val sqlContext: SQLContext) extends BaseRelation with TableScan {
+
+ /** @inheritdoc */
+ override def schema: StructType =
+ StructType(acc.output.map { c ⇒
+ StructField(
+ name = c.name,
+ dataType = c.dataType,
+ nullable = c.nullable,
+ metadata = Metadata.empty)
+ })
+
+ /** @inheritdoc */
+ override def buildScan(): RDD[Row] =
+ IgniteSQLDataFrameRDD[K, V](
+ acc.igniteQueryContext.igniteContext,
+ acc.igniteQueryContext.cacheName,
+ schema,
+ acc.compileQuery(),
+ List.empty,
+ calcPartitions,
+ isDistributeJoin(acc)
+ )
+
+ /** @inheritdoc */
+ override def toString: String =
+ s"IgniteSQLAccumulatorRelation(columns=[${acc.output.map(_.name).mkString(", ")}], qry=${acc.compileQuery()})"
+
+ /**
+ * @return Collection of spark partition.
+ */
+ private def calcPartitions: Array[Partition] =
+ //If accumulator stores some complex query(join, aggregation, limit, order, etc.).
+ //we has to load data from Ignite as a single Spark partition.
+ if (!isSimpleTableAcc(acc)){
+ val aff = acc.igniteQueryContext.igniteContext.ignite().affinity(acc.igniteQueryContext.cacheName)
+
+ val parts = aff.partitions()
+
+ Array(IgniteDataFramePartition(0, primary = null, igniteParts = (0 until parts).toList))
+ }
+ else
+ impl.calcPartitions(acc.igniteQueryContext.igniteContext, acc.igniteQueryContext.cacheName)
+
+ /**
+ * @param acc Plan.
+ * @return True if plan of one or its children are `JoinSQLAccumulator`, false otherwise.
+ */
+ private def isDistributeJoin(acc: LogicalPlan): Boolean =
+ acc match {
+ case _: JoinSQLAccumulator ⇒
+ true
+
+ case _ ⇒
+ acc.children.exists(isDistributeJoin)
+ }
+}
+
+object IgniteSQLAccumulatorRelation {
+ def apply[K, V](acc: QueryAccumulator): IgniteSQLAccumulatorRelation[K, V] =
+ new IgniteSQLAccumulatorRelation[K, V](acc)(acc.igniteQueryContext.sqlContext)
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLDataFrameRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLDataFrameRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLDataFrameRDD.scala
index 93ef529..ec502fc 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLDataFrameRDD.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLDataFrameRDD.scala
@@ -36,7 +36,8 @@ class IgniteSQLDataFrameRDD[K, V](
schema: StructType,
qryStr: String,
args: List[_],
- parts: Array[Partition]) extends
+ parts: Array[Partition],
+ distributedJoin: Boolean) extends
IgniteSqlRDD[Row, JList[_], K, V](
ic,
cacheName,
@@ -56,13 +57,17 @@ class IgniteSQLDataFrameRDD[K, V](
override def compute(partition: Partition, context: TaskContext): Iterator[Row] = {
val qry0 = new SqlFieldsQuery(qryStr)
+ qry0.setDistributedJoins(distributedJoin)
+
if (args.nonEmpty)
qry0.setArgs(args.map(_.asInstanceOf[Object]): _*)
val ccfg = ic.ignite().cache[K, V](cacheName).getConfiguration(classOf[CacheConfiguration[K, V]])
- if (ccfg.getCacheMode != CacheMode.REPLICATED)
- qry0.setPartitions(partition.asInstanceOf[IgniteDataFramePartition].igniteParts: _*)
+ val ignitePartition = partition.asInstanceOf[IgniteDataFramePartition]
+
+ if (ccfg.getCacheMode != CacheMode.REPLICATED && ignitePartition.igniteParts.nonEmpty && !distributedJoin)
+ qry0.setPartitions(ignitePartition.igniteParts: _*)
qry = qry0
@@ -76,7 +81,8 @@ object IgniteSQLDataFrameRDD {
schema: StructType,
qryStr: String,
args: List[_],
- parts: Array[Partition] = Array(IgnitePartition(0))) = {
- new IgniteSQLDataFrameRDD(ic, cacheName, schema, qryStr, args, parts)
+ parts: Array[Partition] = Array(IgnitePartition(0)),
+ distributedJoin: Boolean = false): IgniteSQLDataFrameRDD[K, V] = {
+ new IgniteSQLDataFrameRDD[K, V](ic, cacheName, schema, qryStr, args, parts, distributedJoin)
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLRelation.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLRelation.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLRelation.scala
index 1fb8de7..485ddf6 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLRelation.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLRelation.scala
@@ -18,10 +18,8 @@
package org.apache.ignite.spark.impl
import org.apache.ignite.IgniteException
-import org.apache.ignite.cache.{CacheMode, QueryEntity}
-import org.apache.ignite.cluster.ClusterNode
-import org.apache.ignite.configuration.CacheConfiguration
-import org.apache.ignite.spark.{IgniteContext, IgniteRDD}
+import org.apache.ignite.cache.QueryEntity
+import org.apache.ignite.spark.{IgniteContext, IgniteRDD, impl}
import org.apache.spark.Partition
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
@@ -30,14 +28,13 @@ import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SQLContext}
import scala.collection.JavaConversions._
-import scala.collection.mutable.ArrayBuffer
/**
* Apache Ignite implementation of Spark BaseRelation with PrunedFilteredScan for Ignite SQL Tables
*/
class IgniteSQLRelation[K, V](
- private[spark] val ic: IgniteContext,
- private[spark] val tableName: String)
+ private[apache] val ic: IgniteContext,
+ private[apache] val tableName: String)
(@transient val sqlContext: SQLContext) extends BaseRelation with PrunedFilteredScan with Logging {
/**
@@ -92,41 +89,13 @@ class IgniteSQLRelation[K, V](
qryAndArgs
}
- private def calcPartitions(filters: Array[Filter]): Array[Partition] = {
- val cache = ic.ignite().cache[K, V](cacheName)
-
- val ccfg = cache.getConfiguration(classOf[CacheConfiguration[K, V]])
-
- if (ccfg.getCacheMode == CacheMode.REPLICATED) {
- val serverNodes = ic.ignite().cluster().forCacheNodes(cacheName).forServers().nodes()
-
- Array(IgniteDataFramePartition(0, serverNodes.head, Stream.from(0).take(1024).toList))
- }
- else {
- val aff = ic.ignite().affinity(cacheName)
-
- val parts = aff.partitions()
-
- val nodesToParts = (0 until parts).foldLeft(Map[ClusterNode, ArrayBuffer[Int]]()) {
- case (nodeToParts, ignitePartIdx) ⇒
- val primary = aff.mapPartitionToPrimaryAndBackups(ignitePartIdx).head
-
- if (nodeToParts.contains(primary)) {
- nodeToParts(primary) += ignitePartIdx
-
- nodeToParts
- }
- else
- nodeToParts + (primary → ArrayBuffer[Int](ignitePartIdx))
- }
-
- val partitions = nodesToParts.zipWithIndex.map { case ((node, nodesParts), i) ⇒
- IgniteDataFramePartition(i, node, nodesParts.toList)
- }
-
- partitions.toArray
- }
- }
+ /**
+ * Computes spark partitions for this relation.
+ *
+ * @return Array of IgniteDataFramPartition.
+ */
+ private def calcPartitions(filters: Array[Filter]): Array[Partition] =
+ impl.calcPartitions(ic, cacheName)
/**
* Cache name for a table name.
@@ -134,20 +103,6 @@ class IgniteSQLRelation[K, V](
private lazy val cacheName: String =
sqlCacheName(ic.ignite(), tableName)
.getOrElse(throw new IgniteException(s"Unknown table $tableName"))
-
- /**
- * Utility method to add clause to sql WHERE string.
- *
- * @param filterStr Current filter string
- * @param clause Clause to add.
- * @return Filter string.
- */
- private def addStrClause(filterStr: String, clause: String) =
- if (filterStr.isEmpty)
- clause
- else
- filterStr + " AND " + clause
-
}
object IgniteSQLRelation {
http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/AggregateExpressions.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/AggregateExpressions.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/AggregateExpressions.scala
new file mode 100644
index 0000000..421a9a9
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/AggregateExpressions.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.impl.optimization
+
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.types._
+
+/**
+ * Object to support aggregate expressions like `sum` or `avg`.
+ */
+private[optimization] object AggregateExpressions extends SupportedExpressions {
+ /** @inheritdoc */
+ def apply(expr: Expression, checkChild: (Expression) ⇒ Boolean): Boolean = expr match {
+ case AggregateExpression(aggregateFunction, _, _, _) ⇒
+ checkChild(aggregateFunction)
+
+ case Average(child) ⇒
+ checkChild(child)
+
+ case Count(children) ⇒
+ children.forall(checkChild)
+
+ case Max(child) ⇒
+ checkChild(child)
+
+ case Min(child) ⇒
+ checkChild(child)
+
+ case Sum(child) ⇒
+ checkChild(child)
+
+ case _ ⇒
+ false
+ }
+
+ /** @inheritdoc */
+ override def toString(expr: Expression, childToString: Expression ⇒ String, useQualifier: Boolean,
+ useAlias: Boolean): Option[String] = expr match {
+ case AggregateExpression(aggregateFunction, _, isDistinct, _) ⇒
+ aggregateFunction match {
+ case Count(children) ⇒
+ if (isDistinct)
+ Some(s"COUNT(DISTINCT ${children.map(childToString(_)).mkString(" ")})")
+ else
+ Some(s"COUNT(${children.map(childToString(_)).mkString(" ")})")
+
+ case sum: Sum ⇒
+ if (isDistinct)
+ Some(castSum(
+ s"SUM(DISTINCT ${sum.children.map(childToString(_)).mkString(" ")})", sum.dataType))
+ else
+ Some(castSum(s"SUM(${sum.children.map(childToString(_)).mkString(" ")})", sum.dataType))
+
+ case _ ⇒
+ Some(childToString(aggregateFunction))
+ }
+
+ case Average(child) ⇒
+ child.dataType match {
+ case DecimalType() | DoubleType ⇒
+ Some(s"AVG(${childToString(child)})")
+
+ case _ ⇒
+ //Spark `AVG` return type is always a double or a decimal.
+ //See [[org.apache.spark.sql.catalyst.expressions.aggregate.Average]]
+ //But Ignite `AVG` return type for a integral types is integral.
+ //To preserve query correct results has to cast column to double.
+ Some(s"AVG(CAST(${childToString(child)} AS DOUBLE))")
+ }
+
+
+ case Count(children) ⇒
+ Some(s"COUNT(${children.map(childToString(_)).mkString(" ")})")
+
+ case Max(child) ⇒
+ Some(s"MAX(${childToString(child)})")
+
+ case Min(child) ⇒
+ Some(s"MIN(${childToString(child)})")
+
+ case sum: Sum ⇒
+ Some(castSum(s"SUM(${childToString(sum.child)})", sum.dataType))
+
+ case _ ⇒
+ None
+ }
+
+ /**
+ * Ignite returns BigDecimal but Spark expects BIGINT.
+ */
+ private def castSum(sumSql: String, dataType: DataType): String = dataType match {
+ case LongType ⇒
+ s"CAST($sumSql AS BIGINT)"
+
+ case _ ⇒
+ s"$sumSql"
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/ConditionExpressions.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/ConditionExpressions.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/ConditionExpressions.scala
new file mode 100644
index 0000000..fbfbd64
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/ConditionExpressions.scala
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.spark.impl.optimization
+
+import org.apache.spark.sql.catalyst.expressions.{Expression, _}
+
+/**
+ * Object to support condition expression. Like `and` or `in` operators.
+ */
+private[optimization] object ConditionExpressions extends SupportedExpressions {
+ /** @inheritdoc */
+ def apply(expr: Expression, checkChild: (Expression) ⇒ Boolean): Boolean = expr match {
+ case EqualTo(left, right) ⇒
+ checkChild(left) && checkChild(right)
+
+ case EqualNullSafe(left, right) ⇒
+ checkChild(left) && checkChild(right)
+
+ case GreaterThan(left, right) ⇒
+ checkChild(left) && checkChild(right)
+
+ case GreaterThanOrEqual(left, right) ⇒
+ checkChild(left) && checkChild(right)
+
+ case LessThan(left, right) ⇒
+ checkChild(left) && checkChild(right)
+
+ case LessThanOrEqual(left, right) ⇒
+ checkChild(left) && checkChild(right)
+
+ case InSet(child, set) if set.forall(_.isInstanceOf[Literal]) ⇒
+ checkChild(child)
+
+ case In(child, list) if list.forall(_.isInstanceOf[Literal]) ⇒
+ checkChild(child)
+
+ case IsNull(child) ⇒
+ checkChild(child)
+
+ case IsNotNull(child) ⇒
+ checkChild(child)
+
+ case And(left, right) ⇒
+ checkChild(left) && checkChild(right)
+
+ case Or(left, right) ⇒
+ checkChild(left) && checkChild(right)
+
+ case Not(child) ⇒
+ checkChild(child)
+
+ case StartsWith(left, right) ⇒
+ checkChild(left) && checkChild(right)
+
+ case EndsWith(left, right) ⇒
+ checkChild(left) && checkChild(right)
+
+ case Contains(left, right) ⇒
+ checkChild(left) && checkChild(right)
+
+ case _ ⇒
+ false
+ }
+
+ /** @inheritdoc */
+ override def toString(expr: Expression, childToString: Expression ⇒ String, useQualifier: Boolean,
+ useAlias: Boolean): Option[String] = expr match {
+ case EqualTo(left, right) ⇒
+ Some(s"${childToString(left)} = ${childToString(right)}")
+
+ case EqualNullSafe(left, right) ⇒
+ Some(s"(${childToString(left)} IS NULL OR ${childToString(left)} = ${childToString(right)})")
+
+ case GreaterThan(left, right) ⇒
+ Some(s"${childToString(left)} > ${childToString(right)}")
+
+ case GreaterThanOrEqual(left, right) ⇒
+ Some(s"${childToString(left)} >= ${childToString(right)}")
+
+ case LessThan(left, right) ⇒
+ Some(s"${childToString(left)} < ${childToString(right)}")
+
+ case LessThanOrEqual(left, right) ⇒
+ Some(s"${childToString(left)} <= ${childToString(right)}")
+
+ case In(attr, values) ⇒
+ Some(s"${childToString(attr)} IN (${values.map(childToString(_)).mkString(", ")})")
+
+ case IsNull(child) ⇒
+ Some(s"${childToString(child)} IS NULL")
+
+ case IsNotNull(child) ⇒
+ Some(s"${childToString(child)} IS NOT NULL")
+
+ case And(left, right) ⇒
+ Some(s"${childToString(left)} AND ${childToString(right)}")
+
+ case Or(left, right) ⇒
+ Some(s"${childToString(left)} OR ${childToString(right)}")
+
+ case Not(child) ⇒
+ Some(s"NOT ${childToString(child)}")
+
+ case StartsWith(attr, value) ⇒ {
+ //Expecting string literal here.
+ //To add % sign it's required to remove quotes.
+ val valStr = removeQuotes(childToString(value))
+
+ Some(s"${childToString(attr)} LIKE '$valStr%'")
+ }
+
+ case EndsWith(attr, value) ⇒ {
+ //Expecting string literal here.
+ //To add % sign it's required to remove quotes.
+ val valStr = removeQuotes(childToString(value))
+
+ Some(s"${childToString(attr)} LIKE '%$valStr'")
+ }
+
+ case Contains(attr, value) ⇒ {
+ //Expecting string literal here.
+ //To add % signs it's required to remove quotes.
+ val valStr = removeQuotes(childToString(value))
+
+ Some(s"${childToString(attr)} LIKE '%$valStr%'")
+ }
+
+ case _ ⇒
+ None
+ }
+
+ /**
+ * @param str String to process.
+ * @return Str without surrounding quotes.
+ */
+ private def removeQuotes(str: String): String =
+ if (str.length < 2)
+ str
+ else
+ str match {
+ case quoted if quoted.startsWith("'") && quoted.endsWith("'") ⇒
+ quoted.substring(1, quoted.length-1)
+
+ case _ ⇒ str
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/DateExpressions.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/DateExpressions.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/DateExpressions.scala
new file mode 100644
index 0000000..d075bf0
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/DateExpressions.scala
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.impl.optimization
+
+import org.apache.spark.sql.catalyst.expressions.{Expression, _}
+
+/**
+ * Object to support expressions to work with date/timestamp.
+ */
+private[optimization] object DateExpressions extends SupportedExpressions {
+ /** @inheritdoc */
+ def apply(expr: Expression, checkChild: (Expression) ⇒ Boolean): Boolean = expr match {
+ case CurrentDate(None) ⇒
+ true
+
+ case CurrentTimestamp() ⇒
+ true
+
+ case DateAdd(startDate, days) ⇒
+ checkChild(startDate) && checkChild(days)
+
+ case DateDiff(date1, date2) ⇒
+ checkChild(date1) && checkChild(date2)
+
+ case DayOfMonth(date) ⇒
+ checkChild(date)
+
+ case DayOfYear(date) ⇒
+ checkChild(date)
+
+ case Hour(date, _) ⇒
+ checkChild(date)
+
+ case Minute(date, _) ⇒
+ checkChild(date)
+
+ case Month(date) ⇒
+ checkChild(date)
+
+ case ParseToDate(left, format, child) ⇒
+ checkChild(left) && (format.isEmpty || checkChild(format.get)) && checkChild(child)
+
+ case Quarter(date) ⇒
+ checkChild(date)
+
+ case Second(date, _) ⇒
+ checkChild(date)
+
+ case WeekOfYear(date) ⇒
+ checkChild(date)
+
+ case Year(date) ⇒
+ checkChild(date)
+
+ case _ ⇒
+ false
+ }
+
+ /** @inheritdoc */
+ override def toString(expr: Expression, childToString: Expression ⇒ String, useQualifier: Boolean,
+ useAlias: Boolean): Option[String] = expr match {
+ case CurrentDate(_) ⇒
+ Some(s"CURRENT_DATE()")
+
+ case CurrentTimestamp() ⇒
+ Some(s"CURRENT_TIMESTAMP()")
+
+ case DateAdd(startDate, days) ⇒
+ Some(s"CAST(DATEADD('DAY', ${childToString(days)}, ${childToString(startDate)}) AS DATE)")
+
+ case DateDiff(date1, date2) ⇒
+ Some(s"CAST(DATEDIFF('DAY', ${childToString(date1)}, ${childToString(date2)}) AS INT)")
+
+ case DayOfMonth(date) ⇒
+ Some(s"DAY_OF_MONTH(${childToString(date)})")
+
+ case DayOfYear(date) ⇒
+ Some(s"DAY_OF_YEAR(${childToString(date)})")
+
+ case Hour(date, _) ⇒
+ Some(s"HOUR(${childToString(date)})")
+
+ case Minute(date, _) ⇒
+ Some(s"MINUTE(${childToString(date)})")
+
+ case Month(date) ⇒
+ Some(s"MINUTE(${childToString(date)})")
+
+ case ParseToDate(left, formatOption, _) ⇒
+ formatOption match {
+ case Some(format) ⇒
+ Some(s"PARSEDATETIME(${childToString(left)}, ${childToString(format)})")
+ case None ⇒
+ Some(s"PARSEDATETIME(${childToString(left)})")
+ }
+
+ case Quarter(date) ⇒
+ Some(s"QUARTER(${childToString(date)})")
+
+ case Second(date, _) ⇒
+ Some(s"SECOND(${childToString(date)})")
+
+ case WeekOfYear(date) ⇒
+ Some(s"WEEK(${childToString(date)})")
+
+ case Year(date) ⇒
+ Some(s"YEAR(${childToString(date)})")
+
+ case _ ⇒
+ None
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/IgniteQueryContext.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/IgniteQueryContext.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/IgniteQueryContext.scala
new file mode 100644
index 0000000..c5a7f34
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/IgniteQueryContext.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.impl.optimization
+
+import org.apache.ignite.spark.IgniteContext
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.expressions.NamedExpression
+
+/**
+ * Class to store Ignite query info during optimization process.
+ *
+ * @param igniteContext IgniteContext.
+ * @param sqlContext SQLContext.
+ * @param cacheName Cache name.
+ * @param aliasIndex Iterator to generate indexes for auto-generated aliases.
+ * @param catalogTable CatalogTable from source relation.
+ */
+case class IgniteQueryContext(
+ igniteContext: IgniteContext,
+ sqlContext: SQLContext,
+ cacheName: String,
+ aliasIndex: Iterator[Int],
+ catalogTable: Option[CatalogTable] = None,
+ distributeJoin: Boolean = false
+) {
+ /**
+ * @return Unique table alias.
+ */
+ def uniqueTableAlias: String = "table" + aliasIndex.next
+
+ /**
+ * @param col Column
+ * @return Unique column alias.
+ */
+ def uniqueColumnAlias(col: NamedExpression): String = col.name + "_" + aliasIndex.next
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/MathExpressions.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/MathExpressions.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/MathExpressions.scala
new file mode 100644
index 0000000..dc05e95
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/MathExpressions.scala
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.impl.optimization
+
+import org.apache.spark.sql.catalyst.expressions.{Expression, _}
+
+/**
+ * Object to support math expressions.
+ */
+private[optimization] object MathExpressions extends SupportedExpressions {
+ /** @inheritdoc */
+ def apply(expr: Expression, checkChild: (Expression) ⇒ Boolean): Boolean = expr match {
+ case Abs(child) ⇒
+ checkChild(child)
+
+ case Acos(child) ⇒
+ checkChild(child)
+
+ case Asin(child) ⇒
+ checkChild(child)
+
+ case Atan(child) ⇒
+ checkChild(child)
+
+ case Cos(child) ⇒
+ checkChild(child)
+
+ case Cosh(child) ⇒
+ checkChild(child)
+
+ case Sin(child) ⇒
+ checkChild(child)
+
+ case Sinh(child) ⇒
+ checkChild(child)
+
+ case Tan(child) ⇒
+ checkChild(child)
+
+ case Tanh(child) ⇒
+ checkChild(child)
+
+ case Atan2(left, right) ⇒
+ checkChild(left) && checkChild(right)
+
+ case BitwiseAnd(left, right) ⇒
+ checkChild(left) && checkChild(right)
+
+ case BitwiseOr(left, right) ⇒
+ checkChild(left) && checkChild(right)
+
+ case BitwiseXor(left, right) ⇒
+ checkChild(left) && checkChild(right)
+
+ case Ceil(child) ⇒
+ checkChild(child)
+
+ case ToDegrees(child) ⇒
+ checkChild(child)
+
+ case Exp(child) ⇒
+ checkChild(child)
+
+ case Floor(child) ⇒
+ checkChild(child)
+
+ case Log(child) ⇒
+ checkChild(child)
+
+ case Log10(child) ⇒
+ checkChild(child)
+
+ case Logarithm(left, right) ⇒
+ checkChild(left) && checkChild(right)
+
+ case ToRadians(child) ⇒
+ checkChild(child)
+
+ case Sqrt(child) ⇒
+ checkChild(child)
+
+ case _: Pi ⇒
+ true
+
+ case _: EulerNumber ⇒
+ true
+
+ case Pow(left, right) ⇒
+ checkChild(left) && checkChild(right)
+
+ case Rand(child) ⇒
+ checkChild(child)
+
+ case Round(child, scale) ⇒
+ checkChild(child) && checkChild(scale)
+
+ case Signum(child) ⇒
+ checkChild(child)
+
+ case Remainder(left, right) ⇒
+ checkChild(left) && checkChild(right)
+
+ case Divide(left, right) ⇒
+ checkChild(left) && checkChild(right)
+
+ case Multiply(left, right) ⇒
+ checkChild(left) && checkChild(right)
+
+ case Subtract(left, right) ⇒
+ checkChild(left) && checkChild(right)
+
+ case Add(left, right) ⇒
+ checkChild(left) && checkChild(right)
+
+ case UnaryMinus(child) ⇒
+ checkChild(child)
+
+ case UnaryPositive(child) ⇒
+ checkChild(child)
+
+ case _ ⇒ false
+ }
+
+ /** @inheritdoc */
+ override def toString(expr: Expression, childToString: Expression ⇒ String, useQualifier: Boolean,
+ useAlias: Boolean): Option[String] = expr match {
+ case Abs(child) ⇒
+ Some(s"ABS(${childToString(child)})")
+
+ case Acos(child) ⇒
+ Some(s"ACOS(${childToString(child)})")
+
+ case Asin(child) ⇒
+ Some(s"ASIN(${childToString(child)})")
+
+ case Atan(child) ⇒
+ Some(s"ATAN(${childToString(child)})")
+
+ case Cos(child) ⇒
+ Some(s"COS(${childToString(child)})")
+
+ case Cosh(child) ⇒
+ Some(s"COSH(${childToString(child)})")
+
+ case Sin(child) ⇒
+ Some(s"SIN(${childToString(child)})")
+
+ case Sinh(child) ⇒
+ Some(s"SINH(${childToString(child)})")
+
+ case Tan(child) ⇒
+ Some(s"TAN(${childToString(child)})")
+
+ case Tanh(child) ⇒
+ Some(s"TANH(${childToString(child)})")
+
+ case Atan2(left, right) ⇒
+ Some(s"ATAN2(${childToString(left)}, ${childToString(right)})")
+
+ case BitwiseAnd(left, right) ⇒
+ Some(s"BITAND(${childToString(left)}, ${childToString(right)})")
+
+ case BitwiseOr(left, right) ⇒
+ Some(s"BITOR(${childToString(left)}, ${childToString(right)})")
+
+ case BitwiseXor(left, right) ⇒
+ Some(s"BITXOR(${childToString(left)}, ${childToString(right)})")
+
+ case Ceil(child) ⇒
+ Some(s"CAST(CEIL(${childToString(child)}) AS LONG)")
+
+ case ToDegrees(child) ⇒
+ Some(s"DEGREES(${childToString(child)})")
+
+ case Exp(child) ⇒
+ Some(s"EXP(${childToString(child)})")
+
+ case Floor(child) ⇒
+ Some(s"CAST(FLOOR(${childToString(child)}) AS LONG)")
+
+ case Log(child) ⇒
+ Some(s"LOG(${childToString(child)})")
+
+ case Log10(child) ⇒
+ Some(s"LOG10(${childToString(child)})")
+
+ case Logarithm(base, arg) ⇒
+ childToString(base) match {
+ //Spark internally converts LN(XXX) to LOG(2.718281828459045, XXX).
+ //Because H2 doesn't have builtin function for a free base logarithm
+ //I want to prevent usage of log(a, b) = ln(a)/ln(b) when possible.
+ case "2.718281828459045" ⇒
+ Some(s"LOG(${childToString(arg)})")
+ case "10" ⇒
+ Some(s"LOG10(${childToString(arg)})")
+ case argStr ⇒
+ Some(s"(LOG(${childToString(arg)})/LOG($argStr))")
+ }
+
+ case ToRadians(child) ⇒
+ Some(s"RADIANS(${childToString(child)})")
+
+ case Sqrt(child) ⇒
+ Some(s"SQRT(${childToString(child)})")
+
+ case _: Pi ⇒
+ Some("PI()")
+
+ case _: EulerNumber ⇒
+ Some("E()")
+
+ case Pow(left, right) ⇒
+ Some(s"POWER(${childToString(left)}, ${childToString(right)})")
+
+ case Rand(child) ⇒
+ Some(s"RAND(${childToString(child)})")
+
+ case Round(child, scale) ⇒
+ Some(s"ROUND(${childToString(child)}, ${childToString(scale)})")
+
+ case Signum(child) ⇒
+ Some(s"SIGN(${childToString(child)})")
+
+ case Remainder(left, right) ⇒
+ Some(s"${childToString(left)} % ${childToString(right)}")
+
+ case Divide(left, right) ⇒
+ Some(s"${childToString(left)} / ${childToString(right)}")
+
+ case Multiply(left, right) ⇒
+ Some(s"${childToString(left)} * ${childToString(right)}")
+
+ case Subtract(left, right) ⇒
+ Some(s"${childToString(left)} - ${childToString(right)}")
+
+ case Add(left, right) ⇒
+ Some(s"${childToString(left)} + ${childToString(right)}")
+
+ case UnaryMinus(child) ⇒
+ Some(s"-${childToString(child)}")
+
+ case UnaryPositive(child) ⇒
+ Some(s"+${childToString(child)}")
+
+ case _ ⇒
+ None
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/SimpleExpressions.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/SimpleExpressions.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/SimpleExpressions.scala
new file mode 100644
index 0000000..a1c9458
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/SimpleExpressions.scala
@@ -0,0 +1,180 @@
+package org.apache.ignite.spark.impl.optimization
+
+import java.text.SimpleDateFormat
+
+import org.apache.spark.sql.catalyst.expressions.{Expression, _}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.types._
+
+/**
+ * Object to support some 'simple' expressions like aliases.
+ */
+private[optimization] object SimpleExpressions extends SupportedExpressions {
+ /** @inheritdoc */
+ override def apply(expr: Expression, checkChild: Expression ⇒ Boolean): Boolean = expr match {
+ case Literal(_, _) ⇒
+ true
+
+ case _: Attribute ⇒
+ true
+
+ case Alias(child, _) ⇒
+ checkChild(child)
+
+ case Cast(child, dataType, _) ⇒
+ checkChild(child) && castSupported(from = child.dataType, to = dataType)
+
+ case _ ⇒
+ false
+ }
+
+ /** @inheritdoc */
+ override def toString(expr: Expression, childToString: Expression ⇒ String, useQualifier: Boolean,
+ useAlias: Boolean): Option[String] = expr match {
+ case l: Literal ⇒ l.dataType match {
+ case StringType ⇒
+ Some("'" + l.value.toString + "'")
+
+ case TimestampType ⇒
+ l.value match {
+ //Internal representation of TimestampType is Long.
+ //So we converting from internal spark representation to CAST call.
+ case date: Long ⇒
+ Some(s"CAST('${timestampFormat.get.format(DateTimeUtils.toJavaTimestamp(date))}' AS TIMESTAMP)")
+
+ case _ ⇒
+ Some(l.value.toString)
+ }
+
+ case DateType ⇒
+ l.value match {
+ //Internal representation of DateType is Int.
+ //So we converting from internal spark representation to CAST call.
+ case days: Integer ⇒
+ val date = new java.util.Date(DateTimeUtils.daysToMillis(days))
+
+ Some(s"CAST('${dateFormat.get.format(date)}' AS DATE)")
+
+ case _ ⇒
+ Some(l.value.toString)
+ }
+
+ case _ ⇒
+ if (l.value == null)
+ Some("null")
+ else
+ Some(l.value.toString)
+ }
+
+ case ar: AttributeReference ⇒
+ val name =
+ if (useQualifier)
+ ar.qualifier.map(_ + "." + ar.name).getOrElse(ar.name)
+ else
+ ar.name
+
+ if (ar.metadata.contains(ALIAS) && !isAliasEqualColumnName(ar.metadata.getString(ALIAS), ar.name) && useAlias)
+ Some(aliasToString(name, ar.metadata.getString(ALIAS)))
+ else
+ Some(name)
+
+ case Alias(child, name) ⇒
+ if (useAlias)
+ Some(childToString(child)).map(aliasToString(_, name))
+ else
+ Some(childToString(child))
+
+ case Cast(child, dataType, _) ⇒
+ Some(s"CAST(${childToString(child)} AS ${toSqlType(dataType)})")
+
+ case SortOrder(child, direction, _, _) ⇒
+ Some(s"${childToString(child)}${if(direction==Descending) " DESC" else ""}")
+
+ case _ ⇒
+ None
+ }
+
+ /**
+ * @param column Column name.
+ * @param alias Alias.
+ * @return SQL String for column with alias.
+ */
+ private def aliasToString(column: String, alias: String): String =
+ if (isAliasEqualColumnName(alias, column))
+ column
+ else if (alias.matches("[A-Za-z_][0-9A-Za-z_]*"))
+ s"$column AS $alias"
+ else
+ s"""$column AS "$alias""""
+
+ /**
+ * @param alias Alias.
+ * @param column Column.
+ * @return True if name equals to alias, false otherwise.
+ */
+ private def isAliasEqualColumnName(alias: String, column: String): Boolean =
+ alias.compareToIgnoreCase(column.replaceAll("'", "")) == 0
+
+ /**
+ * @param from From type conversion.
+ * @param to To type conversion.
+ * @return True if cast support for types, false otherwise.
+ */
+ private def castSupported(from: DataType, to: DataType): Boolean = from match {
+ case BooleanType ⇒
+ Set[DataType](BooleanType, StringType)(to)
+
+ case ByteType ⇒
+ Set(ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType, StringType, DecimalType(_, _), StringType)(to)
+
+ case ShortType ⇒
+ Set(ShortType, IntegerType, LongType, FloatType, DoubleType, StringType, DecimalType(_, _))(to)
+
+ case IntegerType ⇒
+ Set(IntegerType, LongType, FloatType, DoubleType, StringType, DecimalType(_, _))(to)
+
+ case LongType ⇒
+ Set(LongType, FloatType, DoubleType, StringType, DecimalType(_, _))(to)
+
+ case FloatType ⇒
+ Set(FloatType, DoubleType, StringType, DecimalType(_, _))(to)
+
+ case DoubleType ⇒
+ Set(DoubleType, StringType, DecimalType(_, _))(to)
+
+ case DecimalType() ⇒
+ Set(StringType, DecimalType(_, _))(to)
+
+ case DateType ⇒
+ Set[DataType](DateType, StringType, LongType, TimestampType)(to)
+
+ case TimestampType ⇒
+ Set[DataType](TimestampType, DateType, StringType, LongType)(to)
+
+ case StringType ⇒
+ Set(BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType,
+ DecimalType(_, _), DateType, TimestampType, StringType)(to)
+
+ case BinaryType ⇒
+ false
+
+ case ArrayType(_, _) ⇒
+ false
+ }
+
+ /**
+ * Date format built-in Ignite.
+ */
+ private val dateFormat: ThreadLocal[SimpleDateFormat] = new ThreadLocal[SimpleDateFormat] {
+ override def initialValue(): SimpleDateFormat =
+ new SimpleDateFormat("yyyy-MM-dd")
+ }
+
+ /**
+ * Timestamp format built-in Ignite.
+ */
+ private val timestampFormat: ThreadLocal[SimpleDateFormat] = new ThreadLocal[SimpleDateFormat] {
+ override def initialValue(): SimpleDateFormat =
+ new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/StringExpressions.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/StringExpressions.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/StringExpressions.scala
new file mode 100644
index 0000000..1ecab2c
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/StringExpressions.scala
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.impl.optimization
+
+import org.apache.spark.sql.catalyst.expressions.{Expression, _}
+
+/**
+ * Object to support expressions to work with strings like `length` or `trim`.
+ */
+private[optimization] object StringExpressions extends SupportedExpressions {
+ /** @inheritdoc */
+ def apply(expr: Expression, checkChild: (Expression) ⇒ Boolean): Boolean = expr match {
+ case Ascii(child) ⇒
+ checkChild(child)
+
+ case Length(child) ⇒
+ checkChild(child)
+
+ case Concat(children) ⇒
+ children.forall(checkChild)
+
+ case ConcatWs(children) ⇒
+ children.forall(checkChild)
+
+ case StringInstr(str, substr) ⇒
+ checkChild(str) && checkChild(substr)
+
+ case Lower(child) ⇒
+ checkChild(child)
+
+ case Upper(child) ⇒
+ checkChild(child)
+
+ case StringLocate(substr, str, start) ⇒
+ checkChild(substr) && checkChild(str) && checkChild(start)
+
+ case StringLPad(str, len, pad) ⇒
+ checkChild(str) && checkChild(len) && checkChild(pad)
+
+ case StringRPad(str, len, pad) ⇒
+ checkChild(str) && checkChild(len) && checkChild(pad)
+
+ case StringTrimLeft(child) ⇒
+ checkChild(child)
+
+ case StringTrimRight(child) ⇒
+ checkChild(child)
+
+ case StringTrim(child) ⇒
+ checkChild(child)
+
+ case RegExpReplace(subject, regexp, rep) ⇒
+ checkChild(subject) && checkChild(regexp) && checkChild(rep)
+
+ case StringRepeat(str, times) ⇒
+ checkChild(str) && checkChild(times)
+
+ case SoundEx(child) ⇒
+ checkChild(child)
+
+ case StringSpace(child) ⇒
+ checkChild(child)
+
+ case Substring(str, pos, len) ⇒
+ checkChild(str) && checkChild(pos) && checkChild(len)
+
+ case Substring(str, pos, len) ⇒
+ checkChild(str) && checkChild(pos) && checkChild(len)
+
+ case StringTranslate(str, strMatch, strReplace) ⇒
+ checkChild(str) && checkChild(strMatch) && checkChild(strReplace)
+
+ case _ ⇒ false
+ }
+
+ /** @inheritdoc */
+ override def toString(expr: Expression, childToString: Expression ⇒ String, useQualifier: Boolean,
+ useAlias: Boolean): Option[String] = expr match {
+ case Ascii(child) ⇒
+ Some(s"ASCII(${childToString(child)})")
+
+ case Length(child) ⇒
+ Some(s"CAST(LENGTH(${childToString(child)}) AS INTEGER)")
+
+ case Concat(children) ⇒
+ Some(s"CONCAT(${children.map(childToString(_)).mkString(", ")})")
+
+ case ConcatWs(children) ⇒
+ Some(s"CONCAT_WS(${children.map(childToString(_)).mkString(", ")})")
+
+ case StringInstr(str, substr) ⇒
+ Some(s"POSITION(${childToString(substr)}, ${childToString(str)})")
+
+ case Lower(child) ⇒
+ Some(s"LOWER(${childToString(child)})")
+
+ case Upper(child) ⇒
+ Some(s"UPPER(${childToString(child)})")
+
+ case StringLocate(substr, str, start) ⇒
+ Some(s"LOCATE(${childToString(substr)}, ${childToString(str)}, ${childToString(start)})")
+
+ case StringLPad(str, len, pad) ⇒
+ Some(s"LPAD(${childToString(str)}, ${childToString(len)}, ${childToString(pad)})")
+
+ case StringRPad(str, len, pad) ⇒
+ Some(s"RPAD(${childToString(str)}, ${childToString(len)}, ${childToString(pad)})")
+
+ case StringTrimLeft(child) ⇒
+ Some(s"LTRIM(${childToString(child)})")
+
+ case StringTrimRight(child) ⇒
+ Some(s"RTRIM(${childToString(child)})")
+
+ case StringTrim(child) ⇒
+ Some(s"TRIM(${childToString(child)})")
+
+ case RegExpReplace(subject, regexp, rep) ⇒
+ Some(s"REGEXP_REPLACE(${childToString(subject)}, ${childToString(regexp)}, ${childToString(rep)})")
+
+ case StringRepeat(str, times) ⇒
+ Some(s"REPEAT(${childToString(str)}, ${childToString(times)})")
+
+ case SoundEx(child) ⇒
+ Some(s"SOUND_EX(${childToString(child)})")
+
+ case StringSpace(child) ⇒
+ Some(s"SPACE(${childToString(child)})")
+
+ case Substring(str, pos, len) ⇒
+ Some(s"SUBSTR(${childToString(str)}, ${childToString(pos)}, ${childToString(len)})")
+
+ case StringTranslate(str, strMatch, strReplace) ⇒
+ Some(s"TRANSLATE(${childToString(str)}, ${childToString(strMatch)}, ${childToString(strReplace)})")
+
+ case _ ⇒
+ None
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/SupportedExpressions.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/SupportedExpressions.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/SupportedExpressions.scala
new file mode 100644
index 0000000..f46eb72
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/SupportedExpressions.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.impl.optimization
+
+import org.apache.spark.sql.catalyst.expressions.Expression
+
+/**
+ * Provides methods to work with Spark SQL expression that supported by Ignite SQL syntax.
+ */
+private[optimization] trait SupportedExpressions {
+ /**
+ * @param expr Expression to check.
+ * @param checkChild Closure to check child expression.
+ * @return True if `expr` are supported, false otherwise.
+ */
+ def apply(expr: Expression, checkChild: (Expression) ⇒ Boolean): Boolean
+
+ /**
+ * @param expr Expression to convert to string.
+ * @param childToString Closure to convert children expressions.
+ * @param useQualifier If true `expr` should be printed using qualifier. `Table1.id` for example.
+ * @param useAlias If true `expr` should be printed with alias. `name as person_name` for example.
+ * @return SQL representation of `expr` if it supported. `None` otherwise.
+ */
+ def toString(expr: Expression, childToString: (Expression) ⇒ String, useQualifier: Boolean,
+ useAlias: Boolean): Option[String]
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/SystemExpressions.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/SystemExpressions.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/SystemExpressions.scala
new file mode 100644
index 0000000..40e4e29
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/SystemExpressions.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.impl.optimization
+
+import org.apache.ignite.IgniteException
+import org.apache.spark.sql.catalyst.expressions.{Coalesce, EqualTo, Expression, Greatest, If, IfNull, IsNotNull, IsNull, Least, Literal, NullIf, Nvl2}
+
+/**
+ * Object to support some built-in expressions like `nvl2` or `coalesce`.
+ */
+private[optimization] object SystemExpressions extends SupportedExpressions {
+ /** @inheritdoc */
+ override def apply(expr: Expression, checkChild: Expression ⇒ Boolean): Boolean = expr match {
+ case Coalesce(children) ⇒
+ children.forall(checkChild)
+
+ case Greatest(children) ⇒
+ children.forall(checkChild)
+
+ case IfNull(left, right, _) ⇒
+ checkChild(left) && checkChild(right)
+
+ case Least(children) ⇒
+ children.forall(checkChild)
+
+ case NullIf(left, right, _) ⇒
+ checkChild(left) && checkChild(right)
+
+ case Nvl2(expr1, expr2, expr3, _) ⇒
+ checkChild(expr1) && checkChild(expr2) && checkChild(expr3)
+
+ case If(predicate, trueValue, falseValue) ⇒
+ predicate match {
+ case IsNotNull(child) ⇒
+ checkChild(child) && checkChild(trueValue) && checkChild(falseValue)
+
+ case IsNull(child) ⇒
+ checkChild(child) && checkChild(trueValue) && checkChild(falseValue)
+
+ case EqualTo(left, right) ⇒
+ trueValue match {
+ case Literal(null, _) ⇒
+ (left == falseValue || right == falseValue) && checkChild(left) && checkChild(right)
+
+ case _ ⇒
+ false
+ }
+
+ case _ ⇒
+ false
+ }
+
+ case _ ⇒
+ false
+ }
+
+ /** @inheritdoc */
+ override def toString(expr: Expression, childToString: Expression ⇒ String, useQualifier: Boolean,
+ useAlias: Boolean): Option[String] = expr match {
+ case Coalesce(children) ⇒
+ Some(s"COALESCE(${children.map(childToString(_)).mkString(", ")})")
+
+ case Greatest(children) ⇒
+ Some(s"GREATEST(${children.map(childToString(_)).mkString(", ")})")
+
+ case IfNull(left, right, _) ⇒
+ Some(s"IFNULL(${childToString(left)}, ${childToString(right)})")
+
+ case Least(children) ⇒
+ Some(s"LEAST(${children.map(childToString(_)).mkString(", ")})")
+
+ case NullIf(left, right, _) ⇒
+ Some(s"NULLIF(${childToString(left)}, ${childToString(right)})")
+
+ case Nvl2(expr1, expr2, expr3, _) ⇒
+ Some(s"NVL2(${childToString(expr1)}, ${childToString(expr2)}, ${childToString(expr3)})")
+
+ case If(predicate, trueValue, falseValue) ⇒
+ predicate match {
+ case IsNotNull(child) ⇒
+ Some(s"NVL2(${childToString(child)}, ${childToString(trueValue)}, ${childToString(falseValue)})")
+
+ case IsNull(child) ⇒
+ Some(s"NVL2(${childToString(child)}, ${childToString(falseValue)}, ${childToString(trueValue)})")
+
+ case EqualTo(left, right) ⇒
+ trueValue match {
+ case Literal(null, _) ⇒
+ if (left == falseValue)
+ Some(s"NULLIF(${childToString(left)}, ${childToString(right)})")
+ else if (right == falseValue)
+ Some(s"NULLIF(${childToString(right)}, ${childToString(left)})")
+ else
+ throw new IgniteException(s"Expression not supported. $expr")
+
+ case _ ⇒
+ throw new IgniteException(s"Expression not supported. $expr")
+ }
+
+ case _ ⇒
+ throw new IgniteException(s"Expression not supported. $expr")
+ }
+
+ case _ ⇒
+ None
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/JoinSQLAccumulator.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/JoinSQLAccumulator.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/JoinSQLAccumulator.scala
new file mode 100644
index 0000000..7ae5e70
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/JoinSQLAccumulator.scala
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.impl.optimization.accumulator
+
+import org.apache.ignite.IgniteException
+import org.apache.ignite.spark.impl.optimization._
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, NamedExpression, SortOrder}
+import org.apache.spark.sql.catalyst.plans.logical.BinaryNode
+import org.apache.spark.sql.catalyst.plans.{Inner, JoinType, LeftOuter, RightOuter}
+
+/**
+ * Accumulator to store information about join query.
+ */
+private[apache] case class JoinSQLAccumulator(
+ igniteQueryContext: IgniteQueryContext,
+ left: QueryAccumulator,
+ right: QueryAccumulator,
+ joinType: JoinType,
+ outputExpressions: Seq[NamedExpression],
+ condition: Option[Expression],
+ leftAlias: Option[String],
+ rightAlias: Option[String],
+ distinct: Boolean = false,
+ where: Option[Seq[Expression]] = None,
+ groupBy: Option[Seq[Expression]] = None,
+ having: Option[Seq[Expression]] = None,
+ limit: Option[Expression] = None,
+ localLimit: Option[Expression] = None,
+ orderBy: Option[Seq[SortOrder]] = None
+) extends BinaryNode with SelectAccumulator {
+ /** @inheritdoc */
+ override def compileQuery(prettyPrint: Boolean = false): String = {
+ val delim = if (prettyPrint) "\n" else " "
+ val tab = if (prettyPrint) " " else ""
+
+ var sql = s"SELECT$delim$tab" +
+ s"${fixQualifier(outputExpressions).map(exprToString(_, useQualifier = true)).mkString(", ")}$delim" +
+ s"FROM$delim$tab$compileJoinExpr"
+
+ if (allFilters.nonEmpty)
+ sql += s"${delim}WHERE$delim$tab" +
+ s"${fixQualifier(allFilters).map(exprToString(_, useQualifier = true)).mkString(s" AND$delim$tab")}"
+
+ if (groupBy.exists(_.nonEmpty))
+ sql += s"${delim}GROUP BY " +
+ s"${fixQualifier(groupBy.get).map(exprToString(_, useQualifier = true)).mkString(s",$delim$tab")}"
+
+ if (having.exists(_.nonEmpty))
+ sql += s"${delim}HAVING " +
+ s"${fixQualifier(having.get).map(exprToString(_, useQualifier = true)).mkString(s" AND$delim$tab")}"
+
+ if (orderBy.exists(_.nonEmpty))
+ sql += s"${delim}ORDER BY " +
+ s"${fixQualifier(orderBy.get).map(exprToString(_, useQualifier = true)).mkString(s",$delim$tab")}"
+
+ if (limit.isDefined)
+ sql += s" LIMIT ${exprToString(fixQualifier0(limit.get), useQualifier = true)}"
+
+ sql
+ }
+
+ /**
+ * @return Filters for this query.
+ */
+ private def allFilters: Seq[Expression] = {
+ val leftFilters =
+ if (isSimpleTableAcc(left))
+ left.asInstanceOf[SingleTableSQLAccumulator].where.getOrElse(Seq.empty)
+ else
+ Seq.empty
+
+ val rightFilters =
+ if (isSimpleTableAcc(right))
+ right.asInstanceOf[SingleTableSQLAccumulator].where.getOrElse(Seq.empty)
+ else Seq.empty
+
+ where.getOrElse(Seq.empty) ++ leftFilters ++ rightFilters
+ }
+
+ /**
+ * @return `table1 LEFT JOIN (SELECT....FROM...) table2` part of join query.
+ */
+ private def compileJoinExpr: String = {
+ val leftJoinSql =
+ if (isSimpleTableAcc(left))
+ left.asInstanceOf[SingleTableSQLAccumulator].table.get
+ else
+ s"(${left.compileQuery()}) ${leftAlias.get}"
+
+ val rightJoinSql = {
+ val leftTableName =
+ if (isSimpleTableAcc(left))
+ left.qualifier
+ else
+ leftAlias.get
+
+ if (isSimpleTableAcc(right)) {
+ val rightTableName = right.asInstanceOf[SingleTableSQLAccumulator].table.get
+
+ if (leftTableName == rightTableName)
+ s"$rightTableName as ${rightAlias.get}"
+ else
+ rightTableName
+ } else
+ s"(${right.compileQuery()}) ${rightAlias.get}"
+ }
+
+ s"$leftJoinSql $joinTypeSQL $rightJoinSql" +
+ s"${condition.map(expr ⇒ s" ON ${exprToString(fixQualifier0(expr), useQualifier = true)}").getOrElse("")}"
+ }
+
+ /**
+ * @return SQL string representing specific join type.
+ */
+ private def joinTypeSQL = joinType match {
+ case Inner ⇒
+ "JOIN"
+ case LeftOuter ⇒
+ "LEFT JOIN"
+
+ case RightOuter ⇒
+ "RIGHT JOIN"
+
+ case _ ⇒
+ throw new IgniteException(s"Unsupported join type $joinType")
+ }
+
+ /**
+ * Changes table qualifier in case of embedded query.
+ *
+ * @param exprs Expressions to fix.
+ * @tparam T type of input expression.
+ * @return copy of `exprs` with fixed qualifier.
+ */
+ private def fixQualifier[T <: Expression](exprs: Seq[T]): Seq[T] =
+ exprs.map(fixQualifier0)
+
+ /**
+ * Changes table qualifier for single expression.
+ *
+ * @param expr Expression to fix.
+ * @tparam T type of input expression.
+ * @return copy of `expr` with fixed qualifier.
+ */
+ private def fixQualifier0[T <: Expression](expr: T): T = expr match {
+ case attr: AttributeReference ⇒
+ attr.withQualifier(Some(findQualifier(attr))).asInstanceOf[T]
+
+ case _ ⇒
+ expr.withNewChildren(fixQualifier(expr.children)).asInstanceOf[T]
+ }
+
+ /**
+ * Find right qualifier for a `attr`.
+ *
+ * @param attr Attribute to fix qualifier in
+ * @return Right qualifier for a `attr`
+ */
+ private def findQualifier(attr: AttributeReference): String = {
+ val leftTableName =
+ if (isSimpleTableAcc(left))
+ left.qualifier
+ else
+ leftAlias.get
+
+ if (left.outputExpressions.exists(_.exprId == attr.exprId))
+ leftTableName
+ else if (isSimpleTableAcc(right) && right.qualifier != leftTableName)
+ right.qualifier
+ else
+ rightAlias.get
+ }
+
+ /** @inheritdoc */
+ override def simpleString: String =
+ s"JoinSQLAccumulator(joinType: $joinType, columns: $outputExpressions, condition: $condition)"
+
+ /** @inheritdoc */
+ override def withOutputExpressions(outputExpressions: Seq[NamedExpression]): SelectAccumulator = copy(outputExpressions= outputExpressions)
+
+ /** @inheritdoc */
+ override def withDistinct(distinct: Boolean): JoinSQLAccumulator = copy(distinct = distinct)
+
+ /** @inheritdoc */
+ override def withWhere(where: Seq[Expression]): JoinSQLAccumulator = copy(where = Some(where))
+
+ /** @inheritdoc */
+ override def withGroupBy(groupBy: Seq[Expression]): JoinSQLAccumulator = copy(groupBy = Some(groupBy))
+
+ /** @inheritdoc */
+ override def withHaving(having: Seq[Expression]): JoinSQLAccumulator = copy(having = Some(having))
+
+ /** @inheritdoc */
+ override def withLimit(limit: Expression): JoinSQLAccumulator = copy(limit = Some(limit))
+
+ /** @inheritdoc */
+ override def withLocalLimit(localLimit: Expression): JoinSQLAccumulator = copy(localLimit = Some(localLimit))
+
+ /** @inheritdoc */
+ override def withOrderBy(orderBy: Seq[SortOrder]): JoinSQLAccumulator = copy(orderBy = Some(orderBy))
+
+ /** @inheritdoc */
+ override def output: Seq[Attribute] = outputExpressions.map(toAttributeReference(_, Seq.empty))
+
+ /** @inheritdoc */
+ override lazy val qualifier: String = igniteQueryContext.uniqueTableAlias
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/QueryAccumulator.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/QueryAccumulator.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/QueryAccumulator.scala
new file mode 100644
index 0000000..133d355
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/QueryAccumulator.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.impl.optimization.accumulator
+
+import org.apache.ignite.spark.impl.optimization.IgniteQueryContext
+import org.apache.spark.sql.catalyst.expressions.{NamedExpression, SortOrder}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+
+/**
+ * Generic query info accumulator interface.
+ */
+private[apache] trait QueryAccumulator extends LogicalPlan {
+ /**
+ * @return Ignite query context.
+ */
+ def igniteQueryContext: IgniteQueryContext
+
+ /**
+ * @return Generated output.
+ */
+ def outputExpressions: Seq[NamedExpression]
+
+ /**
+ * @return Ordering info.
+ */
+ def orderBy: Option[Seq[SortOrder]]
+
+ /**
+ * @param outputExpressions New output expressions.
+ * @return Copy of this accumulator with new output.
+ */
+ def withOutputExpressions(outputExpressions: Seq[NamedExpression]): QueryAccumulator
+
+ /**
+ * @param orderBy New ordering.
+ * @return Copy of this accumulator with new order.
+ */
+ def withOrderBy(orderBy: Seq[SortOrder]): QueryAccumulator
+
+ /**
+ * @param prettyPrint If true human readable query will be generated.
+ * @return SQL query.
+ */
+ def compileQuery(prettyPrint: Boolean = false): String
+
+ /**
+ * @return Qualifier that should be use to select data from this accumulator.
+ */
+ def qualifier: String
+
+ /**
+ * All expressions are resolved when extra optimization executed.
+ */
+ override lazy val resolved = true
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/SelectAccumulator.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/SelectAccumulator.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/SelectAccumulator.scala
new file mode 100644
index 0000000..c1db6f9
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/SelectAccumulator.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.impl.optimization.accumulator
+
+import org.apache.spark.sql.catalyst.expressions.Expression
+
+/**
+ * Generic interface for a SELECT query.
+ */
+private[apache] trait SelectAccumulator extends QueryAccumulator {
+ /**
+ * @return Expression for HAVING part of query.
+ */
+ def having: Option[Seq[Expression]]
+
+ /**
+ * @return Expression for WHERE part of query.
+ */
+ def where: Option[Seq[Expression]]
+
+ /**
+ * @return Expression for GROUP BY part of query.
+ */
+ def groupBy: Option[Seq[Expression]]
+
+ /**
+ * @return Copy of this accumulator with `distinct` flag.
+ */
+ def withDistinct(distinct: Boolean): SelectAccumulator
+
+ /**
+ * @return Copy of this accumulator with `where` expressions.
+ */
+ def withWhere(where: Seq[Expression]): SelectAccumulator
+
+ /**
+ * @return Copy of this accumulator with `groupBy` expressions.
+ */
+ def withGroupBy(groupBy: Seq[Expression]): SelectAccumulator
+
+ /**
+ * @return Copy of this accumulator with `having` expressions.
+ */
+ def withHaving(having: Seq[Expression]): SelectAccumulator
+
+ /**
+ * @return Copy of this accumulator with `limit` expression.
+ */
+ def withLimit(limit: Expression): SelectAccumulator
+
+ /**
+ * @return Copy of this accumulator with `localLimit` expression.
+ */
+ def withLocalLimit(localLimit: Expression): SelectAccumulator
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/10a4c48b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/SingleTableSQLAccumulator.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/SingleTableSQLAccumulator.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/SingleTableSQLAccumulator.scala
new file mode 100644
index 0000000..47035b9
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/accumulator/SingleTableSQLAccumulator.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.impl.optimization.accumulator
+
+import org.apache.ignite.IgniteException
+import org.apache.ignite.spark.impl.optimization._
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, NamedExpression, SortOrder}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+
+/**
+ * Class for accumulating parts of SQL query to a single Ignite table.
+ *
+ * See <a href="http://www.h2database.com/html/grammar.html#select">select syntax of H2</a>.
+ */
+private[apache] case class SingleTableSQLAccumulator(
+ igniteQueryContext: IgniteQueryContext,
+ table: Option[String],
+ tableExpression: Option[(QueryAccumulator, String)],
+ outputExpressions: Seq[NamedExpression],
+ distinct: Boolean = false,
+ all: Boolean = false,
+ where: Option[Seq[Expression]] = None,
+ groupBy: Option[Seq[Expression]] = None,
+ having: Option[Seq[Expression]] = None,
+ limit: Option[Expression] = None,
+ localLimit: Option[Expression] = None,
+ orderBy: Option[Seq[SortOrder]] = None
+) extends SelectAccumulator {
+ /** @inheritdoc */
+ override def compileQuery(prettyPrint: Boolean = false): String = {
+ val delim = if (prettyPrint) "\n" else " "
+ val tab = if (prettyPrint) " " else ""
+
+ var sql = s"SELECT$delim$tab${outputExpressions.map(exprToString(_)).mkString(", ")}${delim}" +
+ s"FROM$delim$tab$compiledTableExpression"
+
+ if (where.exists(_.nonEmpty))
+ sql += s"${delim}WHERE$delim$tab${where.get.map(exprToString(_)).mkString(s" AND$delim$tab")}"
+
+ if (groupBy.exists(_.nonEmpty))
+ sql += s"${delim}GROUP BY ${groupBy.get.map(exprToString(_)).mkString(s",$delim$tab")}"
+
+ if (having.exists(_.nonEmpty))
+ sql += s"${delim}HAVING ${having.get.map(exprToString(_)).mkString(s" AND$delim$tab")}"
+
+ if (orderBy.exists(_.nonEmpty))
+ sql += s"${delim}ORDER BY ${orderBy.get.map(exprToString(_)).mkString(s",$delim$tab")}"
+
+ if (limit.isDefined)
+ sql += s" LIMIT ${limit.map(exprToString(_)).get}"
+
+ sql
+ }
+
+ /**
+ * @return From table SQL query part.
+ */
+ private def compiledTableExpression: String = table match {
+ case Some(tableName) ⇒
+ tableName
+
+ case None ⇒ tableExpression match {
+ case Some((acc, alias)) ⇒
+ s"(${acc.compileQuery()}) $alias"
+
+ case None ⇒
+ throw new IgniteException("Unknown table.")
+ }
+ }
+
+ /** @inheritdoc */
+ override def simpleString: String =
+ s"IgniteSQLAccumulator(table: $table, columns: $outputExpressions, distinct: $distinct, all: $all, " +
+ s"where: $where, groupBy: $groupBy, having: $having, limit: $limit, orderBy: $orderBy)"
+
+ /** @inheritdoc */
+ override def withOutputExpressions(outputExpressions: Seq[NamedExpression]): SelectAccumulator =
+ copy(outputExpressions= outputExpressions)
+
+ /** @inheritdoc */
+ override def withDistinct(distinct: Boolean): SingleTableSQLAccumulator = copy(distinct = distinct)
+
+ /** @inheritdoc */
+ override def withWhere(where: Seq[Expression]): SingleTableSQLAccumulator = copy(where = Some(where))
+
+ /** @inheritdoc */
+ override def withGroupBy(groupBy: Seq[Expression]): SingleTableSQLAccumulator = copy(groupBy = Some(groupBy))
+
+ /** @inheritdoc */
+ override def withHaving(having: Seq[Expression]): SingleTableSQLAccumulator = copy(having = Some(having))
+
+ /** @inheritdoc */
+ override def withLimit(limit: Expression): SingleTableSQLAccumulator = copy(limit = Some(limit))
+
+ /** @inheritdoc */
+ override def withLocalLimit(localLimit: Expression): SingleTableSQLAccumulator = copy(localLimit = Some(localLimit))
+
+ /** @inheritdoc */
+ override def withOrderBy(orderBy: Seq[SortOrder]): SingleTableSQLAccumulator = copy(orderBy = Some(orderBy))
+
+ /** @inheritdoc */
+ override def output: Seq[Attribute] = outputExpressions.map(toAttributeReference(_, Seq.empty))
+
+ /** @inheritdoc */
+ override def qualifier: String = table.getOrElse(tableExpression.get._2)
+
+ /** @inheritdoc */
+ override def children: Seq[LogicalPlan] = tableExpression.map(te ⇒ Seq(te._1)).getOrElse(Nil)
+}