You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/06/13 08:06:41 UTC
[4/5] spark git commit: [SPARK-7186] [SQL] Decouple internal Row from
external Row
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
index 0266084..f9e8150 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
@@ -19,9 +19,10 @@ package org.apache.spark.sql.catalyst.expressions
import com.clearspring.analytics.stream.cardinality.HyperLogLog
-import org.apache.spark.sql.types._
-import org.apache.spark.sql.catalyst.trees
+import org.apache.spark.sql.catalyst
import org.apache.spark.sql.catalyst.errors.TreeNodeException
+import org.apache.spark.sql.catalyst.trees
+import org.apache.spark.sql.types._
import org.apache.spark.util.collection.OpenHashSet
abstract class AggregateExpression extends Expression {
@@ -37,7 +38,7 @@ abstract class AggregateExpression extends Expression {
* [[AggregateExpression.eval]] should never be invoked because [[AggregateExpression]]'s are
* replaced with a physical aggregate operator at runtime.
*/
- override def eval(input: Row = null): Any =
+ override def eval(input: catalyst.InternalRow = null): Any =
throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")
}
@@ -80,7 +81,7 @@ abstract class AggregateFunction
override def nullable: Boolean = base.nullable
override def dataType: DataType = base.dataType
- def update(input: Row): Unit
+ def update(input: catalyst.InternalRow): Unit
// Do we really need this?
override def newInstance(): AggregateFunction = {
@@ -108,7 +109,7 @@ case class MinFunction(expr: Expression, base: AggregateExpression) extends Aggr
val currentMin: MutableLiteral = MutableLiteral(null, expr.dataType)
val cmp = GreaterThan(currentMin, expr)
- override def update(input: Row): Unit = {
+ override def update(input: catalyst.InternalRow): Unit = {
if (currentMin.value == null) {
currentMin.value = expr.eval(input)
} else if (cmp.eval(input) == true) {
@@ -116,7 +117,7 @@ case class MinFunction(expr: Expression, base: AggregateExpression) extends Aggr
}
}
- override def eval(input: Row): Any = currentMin.value
+ override def eval(input: catalyst.InternalRow): Any = currentMin.value
}
case class Max(child: Expression) extends PartialAggregate with trees.UnaryNode[Expression] {
@@ -139,7 +140,7 @@ case class MaxFunction(expr: Expression, base: AggregateExpression) extends Aggr
val currentMax: MutableLiteral = MutableLiteral(null, expr.dataType)
val cmp = LessThan(currentMax, expr)
- override def update(input: Row): Unit = {
+ override def update(input: catalyst.InternalRow): Unit = {
if (currentMax.value == null) {
currentMax.value = expr.eval(input)
} else if (cmp.eval(input) == true) {
@@ -147,7 +148,7 @@ case class MaxFunction(expr: Expression, base: AggregateExpression) extends Aggr
}
}
- override def eval(input: Row): Any = currentMax.value
+ override def eval(input: catalyst.InternalRow): Any = currentMax.value
}
case class Count(child: Expression) extends PartialAggregate with trees.UnaryNode[Expression] {
@@ -205,14 +206,14 @@ case class CollectHashSetFunction(
@transient
val distinctValue = new InterpretedProjection(expr)
- override def update(input: Row): Unit = {
+ override def update(input: catalyst.InternalRow): Unit = {
val evaluatedExpr = distinctValue(input)
if (!evaluatedExpr.anyNull) {
seen.add(evaluatedExpr)
}
}
- override def eval(input: Row): Any = {
+ override def eval(input: catalyst.InternalRow): Any = {
seen
}
}
@@ -238,7 +239,7 @@ case class CombineSetsAndCountFunction(
val seen = new OpenHashSet[Any]()
- override def update(input: Row): Unit = {
+ override def update(input: catalyst.InternalRow): Unit = {
val inputSetEval = inputSet.eval(input).asInstanceOf[OpenHashSet[Any]]
val inputIterator = inputSetEval.iterator
while (inputIterator.hasNext) {
@@ -246,7 +247,7 @@ case class CombineSetsAndCountFunction(
}
}
- override def eval(input: Row): Any = seen.size.toLong
+ override def eval(input: catalyst.InternalRow): Any = seen.size.toLong
}
/** The data type of ApproxCountDistinctPartition since its output is a HyperLogLog object. */
@@ -453,7 +454,7 @@ case class CombineSetsAndSumFunction(
val seen = new OpenHashSet[Any]()
- override def update(input: Row): Unit = {
+ override def update(input: catalyst.InternalRow): Unit = {
val inputSetEval = inputSet.eval(input).asInstanceOf[OpenHashSet[Any]]
val inputIterator = inputSetEval.iterator
while (inputIterator.hasNext) {
@@ -461,8 +462,8 @@ case class CombineSetsAndSumFunction(
}
}
- override def eval(input: Row): Any = {
- val casted = seen.asInstanceOf[OpenHashSet[Row]]
+ override def eval(input: catalyst.InternalRow): Any = {
+ val casted = seen.asInstanceOf[OpenHashSet[catalyst.InternalRow]]
if (casted.size == 0) {
null
} else {
@@ -524,7 +525,7 @@ case class AverageFunction(expr: Expression, base: AggregateExpression)
private def addFunction(value: Any) = Add(sum,
Cast(Literal.create(value, expr.dataType), calcType))
- override def eval(input: Row): Any = {
+ override def eval(input: catalyst.InternalRow): Any = {
if (count == 0L) {
null
} else {
@@ -541,7 +542,7 @@ case class AverageFunction(expr: Expression, base: AggregateExpression)
}
}
- override def update(input: Row): Unit = {
+ override def update(input: catalyst.InternalRow): Unit = {
val evaluatedExpr = expr.eval(input)
if (evaluatedExpr != null) {
count += 1
@@ -555,14 +556,14 @@ case class CountFunction(expr: Expression, base: AggregateExpression) extends Ag
var count: Long = _
- override def update(input: Row): Unit = {
+ override def update(input: catalyst.InternalRow): Unit = {
val evaluatedExpr = expr.eval(input)
if (evaluatedExpr != null) {
count += 1L
}
}
- override def eval(input: Row): Any = count
+ override def eval(input: catalyst.InternalRow): Any = count
}
case class ApproxCountDistinctPartitionFunction(
@@ -574,14 +575,14 @@ case class ApproxCountDistinctPartitionFunction(
private val hyperLogLog = new HyperLogLog(relativeSD)
- override def update(input: Row): Unit = {
+ override def update(input: catalyst.InternalRow): Unit = {
val evaluatedExpr = expr.eval(input)
if (evaluatedExpr != null) {
hyperLogLog.offer(evaluatedExpr)
}
}
- override def eval(input: Row): Any = hyperLogLog
+ override def eval(input: catalyst.InternalRow): Any = hyperLogLog
}
case class ApproxCountDistinctMergeFunction(
@@ -593,12 +594,12 @@ case class ApproxCountDistinctMergeFunction(
private val hyperLogLog = new HyperLogLog(relativeSD)
- override def update(input: Row): Unit = {
+ override def update(input: catalyst.InternalRow): Unit = {
val evaluatedExpr = expr.eval(input)
hyperLogLog.addAll(evaluatedExpr.asInstanceOf[HyperLogLog])
}
- override def eval(input: Row): Any = hyperLogLog.cardinality()
+ override def eval(input: catalyst.InternalRow): Any = hyperLogLog.cardinality()
}
case class SumFunction(expr: Expression, base: AggregateExpression) extends AggregateFunction {
@@ -619,11 +620,11 @@ case class SumFunction(expr: Expression, base: AggregateExpression) extends Aggr
private val addFunction =
Coalesce(Seq(Add(Coalesce(Seq(sum, zero)), Cast(expr, calcType)), sum, zero))
- override def update(input: Row): Unit = {
+ override def update(input: catalyst.InternalRow): Unit = {
sum.update(addFunction, input)
}
- override def eval(input: Row): Any = {
+ override def eval(input: catalyst.InternalRow): Any = {
expr.dataType match {
case DecimalType.Fixed(_, _) =>
Cast(sum, dataType).eval(null)
@@ -652,7 +653,7 @@ case class CombineSumFunction(expr: Expression, base: AggregateExpression)
private val addFunction =
Coalesce(Seq(Add(Coalesce(Seq(sum, zero)), Cast(expr, calcType)), sum, zero))
- override def update(input: Row): Unit = {
+ override def update(input: catalyst.InternalRow): Unit = {
val result = expr.eval(input)
// partial sum result can be null only when no input rows present
if(result != null) {
@@ -660,7 +661,7 @@ case class CombineSumFunction(expr: Expression, base: AggregateExpression)
}
}
- override def eval(input: Row): Any = {
+ override def eval(input: catalyst.InternalRow): Any = {
expr.dataType match {
case DecimalType.Fixed(_, _) =>
Cast(sum, dataType).eval(null)
@@ -676,14 +677,14 @@ case class SumDistinctFunction(expr: Expression, base: AggregateExpression)
private val seen = new scala.collection.mutable.HashSet[Any]()
- override def update(input: Row): Unit = {
+ override def update(input: catalyst.InternalRow): Unit = {
val evaluatedExpr = expr.eval(input)
if (evaluatedExpr != null) {
seen += evaluatedExpr
}
}
- override def eval(input: Row): Any = {
+ override def eval(input: catalyst.InternalRow): Any = {
if (seen.size == 0) {
null
} else {
@@ -707,14 +708,14 @@ case class CountDistinctFunction(
@transient
val distinctValue = new InterpretedProjection(expr)
- override def update(input: Row): Unit = {
+ override def update(input: catalyst.InternalRow): Unit = {
val evaluatedExpr = distinctValue(input)
if (!evaluatedExpr.anyNull) {
seen.add(evaluatedExpr)
}
}
- override def eval(input: Row): Any = seen.size.toLong
+ override def eval(input: catalyst.InternalRow): Any = seen.size.toLong
}
case class FirstFunction(expr: Expression, base: AggregateExpression) extends AggregateFunction {
@@ -722,13 +723,13 @@ case class FirstFunction(expr: Expression, base: AggregateExpression) extends Ag
var result: Any = null
- override def update(input: Row): Unit = {
+ override def update(input: catalyst.InternalRow): Unit = {
if (result == null) {
result = expr.eval(input)
}
}
- override def eval(input: Row): Any = result
+ override def eval(input: catalyst.InternalRow): Any = result
}
case class LastFunction(expr: Expression, base: AggregateExpression) extends AggregateFunction {
@@ -736,11 +737,11 @@ case class LastFunction(expr: Expression, base: AggregateExpression) extends Agg
var result: Any = null
- override def update(input: Row): Unit = {
+ override def update(input: catalyst.InternalRow): Unit = {
result = input
}
- override def eval(input: Row): Any = {
- if (result != null) expr.eval(result.asInstanceOf[Row]) else null
+ override def eval(input: catalyst.InternalRow): Any = {
+ if (result != null) expr.eval(result.asInstanceOf[catalyst.InternalRow]) else null
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
index 124274c..0ba2ff7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
@@ -17,8 +17,9 @@
package org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
-import org.apache.spark.sql.catalyst.expressions.codegen.{GeneratedExpressionCode, CodeGenContext}
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenContext, GeneratedExpressionCode}
import org.apache.spark.sql.catalyst.util.TypeUtils
import org.apache.spark.sql.types._
@@ -29,7 +30,7 @@ abstract class UnaryArithmetic extends UnaryExpression {
override def nullable: Boolean = child.nullable
override def dataType: DataType = child.dataType
- override def eval(input: Row): Any = {
+ override def eval(input: catalyst.InternalRow): Any = {
val evalE = child.eval(input)
if (evalE == null) {
null
@@ -124,7 +125,7 @@ abstract class BinaryArithmetic extends BinaryExpression {
protected def checkTypesInternal(t: DataType): TypeCheckResult
- override def eval(input: Row): Any = {
+ override def eval(input: catalyst.InternalRow): Any = {
val evalE1 = left.eval(input)
if(evalE1 == null) {
null
@@ -219,7 +220,7 @@ case class Divide(left: Expression, right: Expression) extends BinaryArithmetic
case it: IntegralType => it.integral.asInstanceOf[Integral[Any]].quot
}
- override def eval(input: Row): Any = {
+ override def eval(input: catalyst.InternalRow): Any = {
val evalE2 = right.eval(input)
if (evalE2 == null || evalE2 == 0) {
null
@@ -279,7 +280,7 @@ case class Remainder(left: Expression, right: Expression) extends BinaryArithmet
case i: FractionalType => i.asIntegral.asInstanceOf[Integral[Any]]
}
- override def eval(input: Row): Any = {
+ override def eval(input: catalyst.InternalRow): Any = {
val evalE2 = right.eval(input)
if (evalE2 == null || evalE2 == 0) {
null
@@ -330,7 +331,7 @@ case class MaxOf(left: Expression, right: Expression) extends BinaryArithmetic {
private lazy val ordering = TypeUtils.getOrdering(dataType)
- override def eval(input: Row): Any = {
+ override def eval(input: catalyst.InternalRow): Any = {
val evalE1 = left.eval(input)
val evalE2 = right.eval(input)
if (evalE1 == null) {
@@ -384,7 +385,7 @@ case class MinOf(left: Expression, right: Expression) extends BinaryArithmetic {
private lazy val ordering = TypeUtils.getOrdering(dataType)
- override def eval(input: Row): Any = {
+ override def eval(input: catalyst.InternalRow): Any = {
val evalE1 = left.eval(input)
val evalE2 = right.eval(input)
if (evalE1 == null) {
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index 536e477..244a066 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -24,6 +24,7 @@ import com.google.common.cache.{CacheBuilder, CacheLoader}
import org.codehaus.janino.ClassBodyEvaluator
import org.apache.spark.Logging
+import org.apache.spark.sql.catalyst
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
@@ -34,7 +35,7 @@ class IntegerHashSet extends org.apache.spark.util.collection.OpenHashSet[Int]
class LongHashSet extends org.apache.spark.util.collection.OpenHashSet[Long]
/**
- * Java source for evaluating an [[Expression]] given a [[Row]] of input.
+ * Java source for evaluating an [[Expression]] given a [[catalyst.InternalRow]] of input.
*
* @param code The sequence of statements required to evaluate the expression.
* @param isNull A term that holds a boolean value representing whether the expression evaluated
@@ -183,13 +184,13 @@ class CodeGenContext {
}
/**
- * List of data types that have special accessors and setters in [[Row]].
+ * List of data types that have special accessors and setters in [[catalyst.InternalRow]].
*/
val nativeTypes =
Seq(IntegerType, BooleanType, LongType, DoubleType, FloatType, ShortType, ByteType)
/**
- * Returns true if the data type has a special accessor and setter in [[Row]].
+ * Returns true if the data type has a special accessor and setter in [[catalyst.InternalRow]].
*/
def isNativeType(dt: DataType): Boolean = nativeTypes.contains(dt)
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
index ed3df54..35cb954 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.catalyst.expressions.codegen
+import org.apache.spark.sql.catalyst
import org.apache.spark.sql.catalyst.expressions._
// MutableProjection is not accessible in Java
@@ -24,7 +25,7 @@ abstract class BaseMutableProjection extends MutableProjection {}
/**
* Generates byte code that produces a [[MutableRow]] object that can update itself based on a new
- * input [[Row]] for a fixed set of [[Expression Expressions]].
+ * input [[catalyst.InternalRow]] for a fixed set of [[Expression Expressions]].
*/
object GenerateMutableProjection extends CodeGenerator[Seq[Expression], () => MutableProjection] {
@@ -47,7 +48,7 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], () => Mu
"""
}.mkString("\n")
val code = s"""
- import org.apache.spark.sql.Row;
+ import org.apache.spark.sql.catalyst.InternalRow;
public SpecificProjection generate($exprType[] expr) {
return new SpecificProjection(expr);
@@ -69,12 +70,12 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], () => Mu
}
/* Provide immutable access to the last projected row. */
- public Row currentValue() {
- return mutableRow;
+ public InternalRow currentValue() {
+ return (InternalRow) mutableRow;
}
public Object apply(Object _i) {
- Row i = (Row) _i;
+ InternalRow i = (InternalRow) _i;
$projectionCode
return mutableRow;
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
index 56ecc5f..db5d570 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
@@ -19,15 +19,15 @@ package org.apache.spark.sql.catalyst.expressions.codegen
import org.apache.spark.Logging
import org.apache.spark.annotation.Private
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{catalyst, Row}
import org.apache.spark.sql.catalyst.expressions._
/**
* Inherits some default implementation for Java from `Ordering[Row]`
*/
@Private
-class BaseOrdering extends Ordering[Row] {
- def compare(a: Row, b: Row): Int = {
+class BaseOrdering extends Ordering[catalyst.InternalRow] {
+ def compare(a: catalyst.InternalRow, b: catalyst.InternalRow): Int = {
throw new UnsupportedOperationException
}
}
@@ -36,7 +36,8 @@ class BaseOrdering extends Ordering[Row] {
* Generates bytecode for an [[Ordering]] of [[Row Rows]] for a given set of
* [[Expression Expressions]].
*/
-object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[Row]] with Logging {
+object GenerateOrdering
+ extends CodeGenerator[Seq[SortOrder], Ordering[catalyst.InternalRow]] with Logging {
import scala.reflect.runtime.universe._
protected def canonicalize(in: Seq[SortOrder]): Seq[SortOrder] =
@@ -45,7 +46,7 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[Row]] wit
protected def bind(in: Seq[SortOrder], inputSchema: Seq[Attribute]): Seq[SortOrder] =
in.map(BindReferences.bindReference(_, inputSchema))
- protected def create(ordering: Seq[SortOrder]): Ordering[Row] = {
+ protected def create(ordering: Seq[SortOrder]): Ordering[catalyst.InternalRow] = {
val a = newTermName("a")
val b = newTermName("b")
val ctx = newCodeGenContext()
@@ -75,7 +76,7 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[Row]] wit
}.mkString("\n")
val code = s"""
- import org.apache.spark.sql.Row;
+ import org.apache.spark.sql.catalyst.InternalRow;
public SpecificOrdering generate($exprType[] expr) {
return new SpecificOrdering(expr);
@@ -90,8 +91,8 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[Row]] wit
}
@Override
- public int compare(Row a, Row b) {
- Row i = null; // Holds current row being evaluated.
+ public int compare(InternalRow a, InternalRow b) {
+ InternalRow i = null; // Holds current row being evaluated.
$comparisons
return 0;
}
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala
index 4a547b5..9e191dc 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala
@@ -17,30 +17,31 @@
package org.apache.spark.sql.catalyst.expressions.codegen
+import org.apache.spark.sql.catalyst
import org.apache.spark.sql.catalyst.expressions._
/**
* Interface for generated predicate
*/
abstract class Predicate {
- def eval(r: Row): Boolean
+ def eval(r: catalyst.InternalRow): Boolean
}
/**
- * Generates bytecode that evaluates a boolean [[Expression]] on a given input [[Row]].
+ * Generates bytecode that evaluates a boolean [[Expression]] on a given input [[InternalRow]].
*/
-object GeneratePredicate extends CodeGenerator[Expression, (Row) => Boolean] {
+object GeneratePredicate extends CodeGenerator[Expression, (catalyst.InternalRow) => Boolean] {
protected def canonicalize(in: Expression): Expression = ExpressionCanonicalizer.execute(in)
protected def bind(in: Expression, inputSchema: Seq[Attribute]): Expression =
BindReferences.bindReference(in, inputSchema)
- protected def create(predicate: Expression): ((Row) => Boolean) = {
+ protected def create(predicate: Expression): ((catalyst.InternalRow) => Boolean) = {
val ctx = newCodeGenContext()
val eval = predicate.gen(ctx)
val code = s"""
- import org.apache.spark.sql.Row;
+ import org.apache.spark.sql.catalyst.InternalRow;
public SpecificPredicate generate($exprType[] expr) {
return new SpecificPredicate(expr);
@@ -53,7 +54,7 @@ object GeneratePredicate extends CodeGenerator[Expression, (Row) => Boolean] {
}
@Override
- public boolean eval(Row i) {
+ public boolean eval(InternalRow i) {
${eval.code}
return !${eval.isNull} && ${eval.primitive};
}
@@ -65,6 +66,6 @@ object GeneratePredicate extends CodeGenerator[Expression, (Row) => Boolean] {
// fetch the only one method `generate(Expression[])`
val m = c.getDeclaredMethods()(0)
val p = m.invoke(c.newInstance(), ctx.references.toArray).asInstanceOf[Predicate]
- (r: Row) => p.eval(r)
+ (r: catalyst.InternalRow) => p.eval(r)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala
index 9b906c3..8b5dc19 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala
@@ -27,9 +27,10 @@ import org.apache.spark.sql.types._
abstract class BaseProject extends Projection {}
/**
- * Generates bytecode that produces a new [[Row]] object based on a fixed set of input
- * [[Expression Expressions]] and a given input [[Row]]. The returned [[Row]] object is custom
- * generated based on the output types of the [[Expression]] to avoid boxing of primitive values.
+ * Generates bytecode that produces a new [[InternalRow]] object based on a fixed set of input
+ * [[Expression Expressions]] and a given input [[InternalRow]]. The returned [[InternalRow]]
+ * object is custom generated based on the output types of the [[Expression]] to avoid boxing of
+ * primitive values.
*/
object GenerateProjection extends CodeGenerator[Seq[Expression], Projection] {
import scala.reflect.runtime.universe._
@@ -146,7 +147,7 @@ object GenerateProjection extends CodeGenerator[Seq[Expression], Projection] {
}.mkString("\n")
val code = s"""
- import org.apache.spark.sql.Row;
+ import org.apache.spark.sql.catalyst.InternalRow;
public SpecificProjection generate($exprType[] expr) {
return new SpecificProjection(expr);
@@ -161,7 +162,7 @@ object GenerateProjection extends CodeGenerator[Seq[Expression], Projection] {
@Override
public Object apply(Object r) {
- return new SpecificRow(expressions, (Row) r);
+ return new SpecificRow(expressions, (InternalRow) r);
}
}
@@ -169,7 +170,7 @@ object GenerateProjection extends CodeGenerator[Seq[Expression], Projection] {
$columns
- public SpecificRow($exprType[] expressions, Row i) {
+ public SpecificRow($exprType[] expressions, InternalRow i) {
$initColumns
}
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala
index 6398b8f..a6913cc 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst
import org.apache.spark.sql.types._
@@ -41,7 +42,7 @@ case class CreateArray(children: Seq[Expression]) extends Expression {
override def nullable: Boolean = false
- override def eval(input: Row): Any = {
+ override def eval(input: catalyst.InternalRow): Any = {
children.map(_.eval(input))
}
@@ -69,7 +70,7 @@ case class CreateStruct(children: Seq[NamedExpression]) extends Expression {
override def nullable: Boolean = false
- override def eval(input: Row): Any = {
- Row(children.map(_.eval(input)): _*)
+ override def eval(input: catalyst.InternalRow): Any = {
+ InternalRow(children.map(_.eval(input)): _*)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionals.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionals.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionals.scala
index 72b9f23..a119c31 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionals.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionals.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.types.{BooleanType, DataType}
@@ -42,7 +43,7 @@ case class If(predicate: Expression, trueValue: Expression, falseValue: Expressi
override def dataType: DataType = trueValue.dataType
- override def eval(input: Row): Any = {
+ override def eval(input: catalyst.InternalRow): Any = {
if (true == predicate.eval(input)) {
trueValue.eval(input)
} else {
@@ -137,7 +138,7 @@ case class CaseWhen(branches: Seq[Expression]) extends CaseWhenLike {
}
/** Written in imperative fashion for performance considerations. */
- override def eval(input: Row): Any = {
+ override def eval(input: catalyst.InternalRow): Any = {
val len = branchesArr.length
var i = 0
// If all branches fail and an elseVal is not provided, the whole statement
@@ -229,7 +230,7 @@ case class CaseKeyWhen(key: Expression, branches: Seq[Expression]) extends CaseW
}
/** Written in imperative fashion for performance considerations. */
- override def eval(input: Row): Any = {
+ override def eval(input: catalyst.InternalRow): Any = {
val evaluatedKey = key.eval(input)
val len = branchesArr.length
var i = 0
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/decimalFunctions.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/decimalFunctions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/decimalFunctions.scala
index 8ab6d97..de8b66b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/decimalFunctions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/decimalFunctions.scala
@@ -17,7 +17,8 @@
package org.apache.spark.sql.catalyst.expressions
-import org.apache.spark.sql.catalyst.expressions.codegen.{GeneratedExpressionCode, CodeGenContext}
+import org.apache.spark.sql.catalyst
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenContext, GeneratedExpressionCode}
import org.apache.spark.sql.types._
/** Return the unscaled Long value of a Decimal, assuming it fits in a Long */
@@ -28,7 +29,7 @@ case class UnscaledValue(child: Expression) extends UnaryExpression {
override def nullable: Boolean = child.nullable
override def toString: String = s"UnscaledValue($child)"
- override def eval(input: Row): Any = {
+ override def eval(input: catalyst.InternalRow): Any = {
val childResult = child.eval(input)
if (childResult == null) {
null
@@ -50,7 +51,7 @@ case class MakeDecimal(child: Expression, precision: Int, scale: Int) extends Un
override def nullable: Boolean = child.nullable
override def toString: String = s"MakeDecimal($child,$precision,$scale)"
- override def eval(input: Row): Decimal = {
+ override def eval(input: catalyst.InternalRow): Decimal = {
val childResult = child.eval(input)
if (childResult == null) {
null
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
index b6191ea..a80c255 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.expressions
import scala.collection.Map
+import org.apache.spark.sql.catalyst
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, trees}
import org.apache.spark.sql.types._
@@ -53,13 +54,13 @@ abstract class Generator extends Expression {
def elementTypes: Seq[(DataType, Boolean)]
/** Should be implemented by child classes to perform specific Generators. */
- override def eval(input: Row): TraversableOnce[Row]
+ override def eval(input: catalyst.InternalRow): TraversableOnce[catalyst.InternalRow]
/**
* Notifies that there are no more rows to process, clean up code, and additional
* rows can be made here.
*/
- def terminate(): TraversableOnce[Row] = Nil
+ def terminate(): TraversableOnce[catalyst.InternalRow] = Nil
}
/**
@@ -67,22 +68,22 @@ abstract class Generator extends Expression {
*/
case class UserDefinedGenerator(
elementTypes: Seq[(DataType, Boolean)],
- function: Row => TraversableOnce[Row],
+ function: catalyst.InternalRow => TraversableOnce[catalyst.InternalRow],
children: Seq[Expression])
extends Generator {
@transient private[this] var inputRow: InterpretedProjection = _
- @transient private[this] var convertToScala: (Row) => Row = _
+ @transient private[this] var convertToScala: (catalyst.InternalRow) => catalyst.InternalRow = _
private def initializeConverters(): Unit = {
inputRow = new InterpretedProjection(children)
convertToScala = {
val inputSchema = StructType(children.map(e => StructField(e.simpleString, e.dataType, true)))
CatalystTypeConverters.createToScalaConverter(inputSchema)
- }.asInstanceOf[(Row => Row)]
+ }.asInstanceOf[(catalyst.InternalRow => catalyst.InternalRow)]
}
- override def eval(input: Row): TraversableOnce[Row] = {
+ override def eval(input: catalyst.InternalRow): TraversableOnce[catalyst.InternalRow] = {
if (inputRow == null) {
initializeConverters()
}
@@ -108,7 +109,7 @@ case class Explode(child: Expression)
case MapType(kt, vt, valueContainsNull) => (kt, false) :: (vt, valueContainsNull) :: Nil
}
- override def eval(input: Row): TraversableOnce[Row] = {
+ override def eval(input: catalyst.InternalRow): TraversableOnce[catalyst.InternalRow] = {
child.dataType match {
case ArrayType(_, _) =>
val inputArray = child.eval(input).asInstanceOf[Seq[Any]]
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
index a33007b..d8fff2b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.expressions
import java.sql.{Date, Timestamp}
+import org.apache.spark.sql.catalyst
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenContext, GeneratedExpressionCode}
import org.apache.spark.sql.catalyst.util.DateUtils
@@ -87,7 +88,7 @@ case class Literal protected (value: Any, dataType: DataType) extends LeafExpres
case _ => false
}
- override def eval(input: Row): Any = value
+ override def eval(input: catalyst.InternalRow): Any = value
override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = {
// change the isNull and primitive to consts, to inline them
@@ -142,9 +143,9 @@ case class Literal protected (value: Any, dataType: DataType) extends LeafExpres
case class MutableLiteral(var value: Any, dataType: DataType, nullable: Boolean = true)
extends LeafExpression {
- def update(expression: Expression, input: Row): Unit = {
+ def update(expression: Expression, input: catalyst.InternalRow): Unit = {
value = expression.eval(input)
}
- override def eval(input: Row): Any = value
+ override def eval(input: catalyst.InternalRow): Any = value
}
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/math.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/math.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/math.scala
index 97e960b..6f90d60 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/math.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/math.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.types.{DataType, DoubleType}
@@ -34,7 +35,7 @@ abstract class LeafMathExpression(c: Double, name: String)
override def nullable: Boolean = false
override def toString: String = s"$name()"
- override def eval(input: Row): Any = c
+ override def eval(input: catalyst.InternalRow): Any = c
override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = {
s"""
@@ -60,7 +61,7 @@ abstract class UnaryMathExpression(f: Double => Double, name: String)
override def nullable: Boolean = true
override def toString: String = s"$name($child)"
- override def eval(input: Row): Any = {
+ override def eval(input: catalyst.InternalRow): Any = {
val evalE = child.eval(input)
if (evalE == null) {
null
@@ -103,7 +104,7 @@ abstract class BinaryMathExpression(f: (Double, Double) => Double, name: String)
override def dataType: DataType = DoubleType
- override def eval(input: Row): Any = {
+ override def eval(input: catalyst.InternalRow): Any = {
val evalE1 = left.eval(input)
if (evalE1 == null) {
null
@@ -215,7 +216,7 @@ case class ToRadians(child: Expression) extends UnaryMathExpression(math.toRadia
case class Atan2(left: Expression, right: Expression)
extends BinaryMathExpression(math.atan2, "ATAN2") {
- override def eval(input: Row): Any = {
+ override def eval(input: catalyst.InternalRow): Any = {
val evalE1 = left.eval(input)
if (evalE1 == null) {
null
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
index 2e4b9ba..2050512 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenContext, GeneratedExpressionCode}
@@ -114,7 +115,7 @@ case class Alias(child: Expression, name: String)(
// Alias(Generator, xx) need to be transformed into Generate(generator, ...)
override lazy val resolved = childrenResolved && !child.isInstanceOf[Generator]
- override def eval(input: Row): Any = child.eval(input)
+ override def eval(input: catalyst.InternalRow): Any = child.eval(input)
override def gen(ctx: CodeGenContext): GeneratedExpressionCode = child.gen(ctx)
@@ -230,7 +231,7 @@ case class AttributeReference(
}
// Unresolved attributes are transient at compile time and don't get evaluated during execution.
- override def eval(input: Row = null): Any =
+ override def eval(input: catalyst.InternalRow = null): Any =
throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")
override def toString: String = s"$name#${exprId.id}$typeSuffix"
@@ -252,7 +253,7 @@ case class PrettyAttribute(name: String) extends Attribute with trees.LeafNode[E
override def withName(newName: String): Attribute = throw new UnsupportedOperationException
override def qualifiers: Seq[String] = throw new UnsupportedOperationException
override def exprId: ExprId = throw new UnsupportedOperationException
- override def eval(input: Row): Any = throw new UnsupportedOperationException
+ override def eval(input: catalyst.InternalRow): Any = throw new UnsupportedOperationException
override def nullable: Boolean = throw new UnsupportedOperationException
override def dataType: DataType = NullType
}
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullFunctions.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullFunctions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullFunctions.scala
index c2d1a4e..292d626 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullFunctions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullFunctions.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst
import org.apache.spark.sql.catalyst.expressions.codegen.{GeneratedExpressionCode, CodeGenContext}
import org.apache.spark.sql.catalyst.trees
import org.apache.spark.sql.catalyst.analysis.UnresolvedException
@@ -43,7 +44,7 @@ case class Coalesce(children: Seq[Expression]) extends Expression {
this, s"Coalesce cannot have children of different types. $childTypes")
}
- override def eval(input: Row): Any = {
+ override def eval(input: catalyst.InternalRow): Any = {
var i = 0
var result: Any = null
val childIterator = children.iterator
@@ -77,7 +78,7 @@ case class IsNull(child: Expression) extends Predicate with trees.UnaryNode[Expr
override def foldable: Boolean = child.foldable
override def nullable: Boolean = false
- override def eval(input: Row): Any = {
+ override def eval(input: catalyst.InternalRow): Any = {
child.eval(input) == null
}
@@ -96,7 +97,7 @@ case class IsNotNull(child: Expression) extends Predicate with trees.UnaryNode[E
override def nullable: Boolean = false
override def toString: String = s"IS NOT NULL $child"
- override def eval(input: Row): Any = {
+ override def eval(input: catalyst.InternalRow): Any = {
child.eval(input) != null
}
@@ -118,7 +119,7 @@ case class AtLeastNNonNulls(n: Int, children: Seq[Expression]) extends Predicate
private[this] val childrenArray = children.toArray
- override def eval(input: Row): Boolean = {
+ override def eval(input: catalyst.InternalRow): Boolean = {
var numNonNulls = 0
var i = 0
while (i < childrenArray.length && numNonNulls < n) {
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
index fbc97b2..c2e57b4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.catalyst
+import org.apache.spark.sql.catalyst
+
/**
* A set of classes that can be used to represent trees of relational expressions. A key goal of
* the expression library is to hide the details of naming and scoping from developers who want to
@@ -49,30 +51,30 @@ package org.apache.spark.sql.catalyst
*/
package object expressions {
- type Row = org.apache.spark.sql.Row
+ type InternalRow = catalyst.InternalRow
- val Row = org.apache.spark.sql.Row
+ val InternalRow = catalyst.InternalRow
/**
- * Converts a [[Row]] to another Row given a sequence of expression that define each column of the
- * new row. If the schema of the input row is specified, then the given expression will be bound
- * to that schema.
+ * Converts a [[InternalRow]] to another Row given a sequence of expression that define each
+ * column of the new row. If the schema of the input row is specified, then the given expression
+ * will be bound to that schema.
*/
- abstract class Projection extends (Row => Row)
+ abstract class Projection extends (InternalRow => InternalRow)
/**
- * Converts a [[Row]] to another Row given a sequence of expression that define each column of the
- * new row. If the schema of the input row is specified, then the given expression will be bound
- * to that schema.
+ * Converts a [[InternalRow]] to another Row given a sequence of expression that define each
+ * column of the new row. If the schema of the input row is specified, then the given expression
+ * will be bound to that schema.
*
* In contrast to a normal projection, a MutableProjection reuses the same underlying row object
* each time an input row is added. This significantly reduces the cost of calculating the
- * projection, but means that it is not safe to hold on to a reference to a [[Row]] after `next()`
- * has been called on the [[Iterator]] that produced it. Instead, the user must call `Row.copy()`
- * and hold on to the returned [[Row]] before calling `next()`.
+ * projection, but means that it is not safe to hold on to a reference to a [[InternalRow]] after
+ * `next()` has been called on the [[Iterator]] that produced it. Instead, the user must call
+ * `InternalRow.copy()` and hold on to the returned [[InternalRow]] before calling `next()`.
*/
abstract class MutableProjection extends Projection {
- def currentValue: Row
+ def currentValue: InternalRow
/** Uses the given row to store the output of the projection. */
def target(row: MutableRow): MutableProjection
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
index 7574d1c..082d72e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
@@ -24,11 +24,11 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.types._
object InterpretedPredicate {
- def create(expression: Expression, inputSchema: Seq[Attribute]): (Row => Boolean) =
+ def create(expression: Expression, inputSchema: Seq[Attribute]): (InternalRow => Boolean) =
create(BindReferences.bindReference(expression, inputSchema))
- def create(expression: Expression): (Row => Boolean) = {
- (r: Row) => expression.eval(r).asInstanceOf[Boolean]
+ def create(expression: Expression): (InternalRow => Boolean) = {
+ (r: InternalRow) => expression.eval(r).asInstanceOf[Boolean]
}
}
@@ -77,7 +77,7 @@ case class Not(child: Expression) extends UnaryExpression with Predicate with Ex
override def expectedChildTypes: Seq[DataType] = Seq(BooleanType)
- override def eval(input: Row): Any = {
+ override def eval(input: InternalRow): Any = {
child.eval(input) match {
case null => null
case b: Boolean => !b
@@ -98,7 +98,7 @@ case class In(value: Expression, list: Seq[Expression]) extends Predicate {
override def nullable: Boolean = true // TODO: Figure out correct nullability semantics of IN.
override def toString: String = s"$value IN ${list.mkString("(", ",", ")")}"
- override def eval(input: Row): Any = {
+ override def eval(input: InternalRow): Any = {
val evaluatedValue = value.eval(input)
list.exists(e => e.eval(input) == evaluatedValue)
}
@@ -117,7 +117,7 @@ case class InSet(value: Expression, hset: Set[Any])
override def nullable: Boolean = true // TODO: Figure out correct nullability semantics of IN.
override def toString: String = s"$value INSET ${hset.mkString("(", ",", ")")}"
- override def eval(input: Row): Any = {
+ override def eval(input: InternalRow): Any = {
hset.contains(value.eval(input))
}
}
@@ -129,7 +129,7 @@ case class And(left: Expression, right: Expression)
override def symbol: String = "&&"
- override def eval(input: Row): Any = {
+ override def eval(input: InternalRow): Any = {
val l = left.eval(input)
if (l == false) {
false
@@ -178,7 +178,7 @@ case class Or(left: Expression, right: Expression)
override def symbol: String = "||"
- override def eval(input: Row): Any = {
+ override def eval(input: InternalRow): Any = {
val l = left.eval(input)
if (l == true) {
true
@@ -235,7 +235,7 @@ abstract class BinaryComparison extends BinaryExpression with Predicate {
protected def checkTypesInternal(t: DataType): TypeCheckResult
- override def eval(input: Row): Any = {
+ override def eval(input: InternalRow): Any = {
val evalE1 = left.eval(input)
if (evalE1 == null) {
null
@@ -288,7 +288,7 @@ case class EqualNullSafe(left: Expression, right: Expression) extends BinaryComp
override protected def checkTypesInternal(t: DataType) = TypeCheckResult.TypeCheckSuccess
- override def eval(input: Row): Any = {
+ override def eval(input: InternalRow): Any = {
val l = left.eval(input)
val r = right.eval(input)
if (l == null && r == null) {
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/random.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/random.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/random.scala
index 6e4e9cb..7e80333 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/random.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/random.scala
@@ -48,7 +48,7 @@ abstract class RDG(seed: Long) extends LeafExpression with Serializable {
/** Generate a random column with i.i.d. uniformly distributed values in [0, 1). */
case class Rand(seed: Long) extends RDG(seed) {
- override def eval(input: Row): Double = rng.nextDouble()
+ override def eval(input: InternalRow): Double = rng.nextDouble()
}
object Rand {
@@ -62,7 +62,7 @@ object Rand {
/** Generate a random column with i.i.d. gaussian random distribution. */
case class Randn(seed: Long) extends RDG(seed) {
- override def eval(input: Row): Double = rng.nextGaussian()
+ override def eval(input: InternalRow): Double = rng.nextGaussian()
}
object Randn {
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
index 5d2d820..534dac1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
@@ -21,10 +21,10 @@ import org.apache.spark.sql.types.{DataType, StructType, AtomicType}
import org.apache.spark.unsafe.types.UTF8String
/**
- * An extended interface to [[Row]] that allows the values for each column to be updated. Setting
- * a value through a primitive function implicitly marks that column as not null.
+ * An extended interface to [[InternalRow]] that allows the values for each column to be updated.
+ * Setting a value through a primitive function implicitly marks that column as not null.
*/
-trait MutableRow extends Row {
+trait MutableRow extends InternalRow {
def setNullAt(i: Int): Unit
def update(ordinal: Int, value: Any)
@@ -37,13 +37,12 @@ trait MutableRow extends Row {
def setByte(ordinal: Int, value: Byte)
def setFloat(ordinal: Int, value: Float)
def setString(ordinal: Int, value: String)
- // TODO(davies): add setDate() and setDecimal()
}
/**
* A row with no data. Calling any methods will result in an error. Can be used as a placeholder.
*/
-object EmptyRow extends Row {
+object EmptyRow extends InternalRow {
override def apply(i: Int): Any = throw new UnsupportedOperationException
override def toSeq: Seq[Any] = Seq.empty
override def length: Int = 0
@@ -57,7 +56,7 @@ object EmptyRow extends Row {
override def getByte(i: Int): Byte = throw new UnsupportedOperationException
override def getString(i: Int): String = throw new UnsupportedOperationException
override def getAs[T](i: Int): T = throw new UnsupportedOperationException
- override def copy(): Row = this
+ override def copy(): InternalRow = this
}
/**
@@ -65,7 +64,7 @@ object EmptyRow extends Row {
* the array is not copied, and thus could technically be mutated after creation, this is not
* allowed.
*/
-class GenericRow(protected[sql] val values: Array[Any]) extends Row {
+class GenericRow(protected[sql] val values: Array[Any]) extends InternalRow {
/** No-arg constructor for serialization. */
protected def this() = this(null)
@@ -154,7 +153,7 @@ class GenericRow(protected[sql] val values: Array[Any]) extends Row {
}
override def equals(o: Any): Boolean = o match {
- case other: Row =>
+ case other: InternalRow =>
if (values.length != other.length) {
return false
}
@@ -174,7 +173,7 @@ class GenericRow(protected[sql] val values: Array[Any]) extends Row {
case _ => false
}
- override def copy(): Row = this
+ override def copy(): InternalRow = this
}
class GenericRowWithSchema(values: Array[Any], override val schema: StructType)
@@ -207,15 +206,15 @@ class GenericMutableRow(v: Array[Any]) extends GenericRow(v) with MutableRow {
override def update(ordinal: Int, value: Any): Unit = { values(ordinal) = value }
- override def copy(): Row = new GenericRow(values.clone())
+ override def copy(): InternalRow = new GenericRow(values.clone())
}
-class RowOrdering(ordering: Seq[SortOrder]) extends Ordering[Row] {
+class RowOrdering(ordering: Seq[SortOrder]) extends Ordering[InternalRow] {
def this(ordering: Seq[SortOrder], inputSchema: Seq[Attribute]) =
this(ordering.map(BindReferences.bindReference(_, inputSchema)))
- def compare(a: Row, b: Row): Int = {
+ def compare(a: InternalRow, b: InternalRow): Int = {
var i = 0
while (i < ordering.size) {
val order = ordering(i)
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/sets.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/sets.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/sets.scala
index 2bcb960..30e4167 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/sets.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/sets.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.catalyst.expressions
-import org.apache.spark.sql.catalyst.expressions.codegen.{GeneratedExpressionCode, CodeGenContext}
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenContext, GeneratedExpressionCode}
import org.apache.spark.sql.types._
import org.apache.spark.util.collection.OpenHashSet
@@ -57,7 +57,7 @@ case class NewSet(elementType: DataType) extends LeafExpression {
override def dataType: OpenHashSetUDT = new OpenHashSetUDT(elementType)
- override def eval(input: Row): Any = {
+ override def eval(input: InternalRow): Any = {
new OpenHashSet[Any]()
}
@@ -87,7 +87,7 @@ case class AddItemToSet(item: Expression, set: Expression) extends Expression {
override def dataType: OpenHashSetUDT = set.dataType.asInstanceOf[OpenHashSetUDT]
- override def eval(input: Row): Any = {
+ override def eval(input: InternalRow): Any = {
val itemEval = item.eval(input)
val setEval = set.eval(input).asInstanceOf[OpenHashSet[Any]]
@@ -137,7 +137,7 @@ case class CombineSets(left: Expression, right: Expression) extends BinaryExpres
override def symbol: String = "++="
- override def eval(input: Row): Any = {
+ override def eval(input: InternalRow): Any = {
val leftEval = left.eval(input).asInstanceOf[OpenHashSet[Any]]
if(leftEval != null) {
val rightEval = right.eval(input).asInstanceOf[OpenHashSet[Any]]
@@ -183,7 +183,7 @@ case class CountSet(child: Expression) extends UnaryExpression {
override def dataType: DataType = LongType
- override def eval(input: Row): Any = {
+ override def eval(input: InternalRow): Any = {
val childEval = child.eval(input).asInstanceOf[OpenHashSet[Any]]
if (childEval != null) {
childEval.size.toLong
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala
index 4f4c195..8ca8d22 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala
@@ -49,7 +49,7 @@ trait StringRegexExpression extends ExpectsInputTypes {
protected def pattern(str: String) = if (cache == null) compile(str) else cache
- override def eval(input: Row): Any = {
+ override def eval(input: InternalRow): Any = {
val l = left.eval(input)
if (l == null) {
null
@@ -121,7 +121,7 @@ trait CaseConversionExpression extends ExpectsInputTypes {
override def dataType: DataType = StringType
override def expectedChildTypes: Seq[DataType] = Seq(StringType)
- override def eval(input: Row): Any = {
+ override def eval(input: InternalRow): Any = {
val evaluated = child.eval(input)
if (evaluated == null) {
null
@@ -169,7 +169,7 @@ trait StringComparison extends ExpectsInputTypes {
override def expectedChildTypes: Seq[DataType] = Seq(StringType, StringType)
- override def eval(input: Row): Any = {
+ override def eval(input: InternalRow): Any = {
val leftEval = left.eval(input)
if(leftEval == null) {
null
@@ -262,7 +262,7 @@ case class Substring(str: Expression, pos: Expression, len: Expression)
(start, end)
}
- override def eval(input: Row): Any = {
+ override def eval(input: InternalRow): Any = {
val string = str.eval(input)
val po = pos.eval(input)
val ln = len.eval(input)
@@ -303,7 +303,7 @@ case class StringLength(child: Expression) extends UnaryExpression with ExpectsI
override def dataType: DataType = IntegerType
override def expectedChildTypes: Seq[DataType] = Seq(StringType)
- override def eval(input: Row): Any = {
+ override def eval(input: InternalRow): Any = {
val string = child.eval(input)
if (string == null) null else string.asInstanceOf[UTF8String].length
}
@@ -314,5 +314,3 @@ case class StringLength(child: Expression) extends UnaryExpression with ExpectsI
defineCodeGen(ctx, ev, c => s"($c).length()")
}
}
-
-
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
index 82c4d46..056f170 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
@@ -74,7 +74,7 @@ case class WindowSpecDefinition(
override def toString: String = simpleString
- override def eval(input: Row): Any = throw new UnsupportedOperationException
+ override def eval(input: InternalRow): Any = throw new UnsupportedOperationException
override def nullable: Boolean = true
override def foldable: Boolean = false
override def dataType: DataType = throw new UnsupportedOperationException
@@ -259,7 +259,7 @@ trait WindowFunction extends Expression {
def reset(): Unit
- def prepareInputParameters(input: Row): AnyRef
+ def prepareInputParameters(input: InternalRow): AnyRef
def update(input: AnyRef): Unit
@@ -286,7 +286,7 @@ case class UnresolvedWindowFunction(
throw new UnresolvedException(this, "init")
override def reset(): Unit =
throw new UnresolvedException(this, "reset")
- override def prepareInputParameters(input: Row): AnyRef =
+ override def prepareInputParameters(input: InternalRow): AnyRef =
throw new UnresolvedException(this, "prepareInputParameters")
override def update(input: AnyRef): Unit =
throw new UnresolvedException(this, "update")
@@ -297,7 +297,7 @@ case class UnresolvedWindowFunction(
override def get(index: Int): Any =
throw new UnresolvedException(this, "get")
// Unresolved functions are transient at compile time and don't get evaluated during execution.
- override def eval(input: Row = null): Any =
+ override def eval(input: InternalRow = null): Any =
throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")
override def toString: String = s"'$name(${children.mkString(",")})"
@@ -316,7 +316,7 @@ case class UnresolvedWindowExpression(
override lazy val resolved = false
// Unresolved functions are transient at compile time and don't get evaluated during execution.
- override def eval(input: Row = null): Any =
+ override def eval(input: InternalRow = null): Any =
throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")
}
@@ -327,7 +327,7 @@ case class WindowExpression(
override def children: Seq[Expression] =
windowFunction :: windowSpec :: Nil
- override def eval(input: Row): Any =
+ override def eval(input: InternalRow): Any =
throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")
override def dataType: DataType = windowFunction.dataType
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala
index e3e070f..2c946cd 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala
@@ -17,10 +17,9 @@
package org.apache.spark.sql.catalyst.plans.logical
-import org.apache.spark.sql.Row
-import org.apache.spark.sql.catalyst.{CatalystTypeConverters, analysis}
import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.types.{StructType, StructField}
+import org.apache.spark.sql.catalyst.{InternalRow, CatalystTypeConverters, analysis}
+import org.apache.spark.sql.types.{StructField, StructType}
object LocalRelation {
def apply(output: Attribute*): LocalRelation = new LocalRelation(output)
@@ -32,11 +31,11 @@ object LocalRelation {
def fromProduct(output: Seq[Attribute], data: Seq[Product]): LocalRelation = {
val schema = StructType.fromAttributes(output)
val converter = CatalystTypeConverters.createToCatalystConverter(schema)
- LocalRelation(output, data.map(converter(_).asInstanceOf[Row]))
+ LocalRelation(output, data.map(converter(_).asInstanceOf[InternalRow]))
}
}
-case class LocalRelation(output: Seq[Attribute], data: Seq[Row] = Nil)
+case class LocalRelation(output: Seq[Attribute], data: Seq[InternalRow] = Nil)
extends LeafNode with analysis.MultiInstanceRelation {
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
index 80ba57a..42dead7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
@@ -17,8 +17,9 @@
package org.apache.spark.sql.catalyst.plans.physical
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.errors.TreeNodeException
-import org.apache.spark.sql.catalyst.expressions.{Expression, Row, SortOrder}
+import org.apache.spark.sql.catalyst.expressions.{Expression, SortOrder}
import org.apache.spark.sql.types.{DataType, IntegerType}
/**
@@ -169,7 +170,7 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int)
override def keyExpressions: Seq[Expression] = expressions
- override def eval(input: Row = null): Any =
+ override def eval(input: InternalRow = null): Any =
throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")
}
@@ -213,6 +214,6 @@ case class RangePartitioning(ordering: Seq[SortOrder], numPartitions: Int)
override def keyExpressions: Seq[Expression] = ordering.map(_.child)
- override def eval(input: Row): Any =
+ override def eval(input: InternalRow): Any =
throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")
}
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
index 9a24b23..b4d5e01 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
@@ -21,7 +21,7 @@ import java.math.BigInteger
import java.sql.{Date, Timestamp}
import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.catalyst.expressions.Row
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types._
case class PrimitiveData(
@@ -257,7 +257,7 @@ class ScalaReflectionSuite extends SparkFunSuite {
test("convert PrimitiveData to catalyst") {
val data = PrimitiveData(1, 1, 1, 1, 1, 1, true)
- val convertedData = Row(1, 1.toLong, 1.toDouble, 1.toFloat, 1.toShort, 1.toByte, true)
+ val convertedData = InternalRow(1, 1.toLong, 1.toDouble, 1.toFloat, 1.toShort, 1.toByte, true)
val dataType = schemaFor[PrimitiveData].dataType
assert(CatalystTypeConverters.convertToCatalyst(data, dataType) === convertedData)
}
@@ -267,8 +267,8 @@ class ScalaReflectionSuite extends SparkFunSuite {
val data = OptionalData(Some(2), Some(2), Some(2), Some(2), Some(2), Some(2), Some(true),
Some(primitiveData))
val dataType = schemaFor[OptionalData].dataType
- val convertedData = Row(2, 2.toLong, 2.toDouble, 2.toFloat, 2.toShort, 2.toByte, true,
- Row(1, 1, 1, 1, 1, 1, true))
+ val convertedData = InternalRow(2, 2.toLong, 2.toDouble, 2.toFloat, 2.toShort, 2.toByte, true,
+ InternalRow(1, 1, 1, 1, 1, 1, true))
assert(CatalystTypeConverters.convertToCatalyst(data, dataType) === convertedData)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala
index 969c6cc..e407f6f 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala
@@ -437,14 +437,14 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper {
test("cast from struct") {
val struct = Literal.create(
- Row("123", "abc", "", null),
+ InternalRow("123", "abc", "", null),
StructType(Seq(
StructField("a", StringType, nullable = true),
StructField("b", StringType, nullable = true),
StructField("c", StringType, nullable = true),
StructField("d", StringType, nullable = true))))
val struct_notNull = Literal.create(
- Row("123", "abc", ""),
+ InternalRow("123", "abc", ""),
StructType(Seq(
StructField("a", StringType, nullable = false),
StructField("b", StringType, nullable = false),
@@ -457,7 +457,7 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper {
StructField("c", IntegerType, nullable = true),
StructField("d", IntegerType, nullable = true))))
assert(ret.resolved === true)
- checkEvaluation(ret, Row(123, null, null, null))
+ checkEvaluation(ret, InternalRow(123, null, null, null))
}
{
val ret = cast(struct, StructType(Seq(
@@ -474,7 +474,7 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper {
StructField("c", BooleanType, nullable = true),
StructField("d", BooleanType, nullable = true))))
assert(ret.resolved === true)
- checkEvaluation(ret, Row(true, true, false, null))
+ checkEvaluation(ret, InternalRow(true, true, false, null))
}
{
val ret = cast(struct, StructType(Seq(
@@ -491,7 +491,7 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper {
StructField("b", IntegerType, nullable = true),
StructField("c", IntegerType, nullable = true))))
assert(ret.resolved === true)
- checkEvaluation(ret, Row(123, null, null))
+ checkEvaluation(ret, InternalRow(123, null, null))
}
{
val ret = cast(struct_notNull, StructType(Seq(
@@ -506,7 +506,7 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper {
StructField("b", BooleanType, nullable = true),
StructField("c", BooleanType, nullable = true))))
assert(ret.resolved === true)
- checkEvaluation(ret, Row(true, true, false))
+ checkEvaluation(ret, InternalRow(true, true, false))
}
{
val ret = cast(struct_notNull, StructType(Seq(
@@ -514,7 +514,7 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper {
StructField("b", BooleanType, nullable = true),
StructField("c", BooleanType, nullable = false))))
assert(ret.resolved === true)
- checkEvaluation(ret, Row(true, true, false))
+ checkEvaluation(ret, InternalRow(true, true, false))
}
{
@@ -532,10 +532,10 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper {
test("complex casting") {
val complex = Literal.create(
- Row(
+ InternalRow(
Seq("123", "abc", ""),
Map("a" -> "123", "b" -> "abc", "c" -> ""),
- Row(0)),
+ InternalRow(0)),
StructType(Seq(
StructField("a",
ArrayType(StringType, containsNull = false), nullable = true),
@@ -555,10 +555,10 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper {
StructField("l", LongType, nullable = true)))))))
assert(ret.resolved === true)
- checkEvaluation(ret, Row(
+ checkEvaluation(ret, InternalRow(
Seq(123, null, null),
Map("a" -> true, "b" -> true, "c" -> false),
- Row(0L)))
+ InternalRow(0L)))
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ComplexTypeSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ComplexTypeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ComplexTypeSuite.scala
index bcc594c..2b0f461 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ComplexTypeSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ComplexTypeSuite.scala
@@ -27,10 +27,10 @@ import org.apache.spark.unsafe.types.UTF8String
class ComplexTypeSuite extends SparkFunSuite with ExpressionEvalHelper {
test("CreateStruct") {
- val row = Row(1, 2, 3)
+ val row = InternalRow(1, 2, 3)
val c1 = 'a.int.at(0).as("a")
val c3 = 'c.int.at(2).as("c")
- checkEvaluation(CreateStruct(Seq(c1, c3)), Row(1, 3), row)
+ checkEvaluation(CreateStruct(Seq(c1, c3)), InternalRow(1, 3), row)
}
test("complex type") {
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
index 4a241d3..12d2da8 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
@@ -32,26 +32,26 @@ import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Project}
trait ExpressionEvalHelper {
self: SparkFunSuite =>
- protected def create_row(values: Any*): Row = {
+ protected def create_row(values: Any*): InternalRow = {
new GenericRow(values.map(CatalystTypeConverters.convertToCatalyst).toArray)
}
protected def checkEvaluation(
- expression: Expression, expected: Any, inputRow: Row = EmptyRow): Unit = {
+ expression: Expression, expected: Any, inputRow: InternalRow = EmptyRow): Unit = {
checkEvaluationWithoutCodegen(expression, expected, inputRow)
checkEvaluationWithGeneratedMutableProjection(expression, expected, inputRow)
checkEvaluationWithGeneratedProjection(expression, expected, inputRow)
checkEvaluationWithOptimization(expression, expected, inputRow)
}
- protected def evaluate(expression: Expression, inputRow: Row = EmptyRow): Any = {
+ protected def evaluate(expression: Expression, inputRow: InternalRow = EmptyRow): Any = {
expression.eval(inputRow)
}
protected def checkEvaluationWithoutCodegen(
expression: Expression,
expected: Any,
- inputRow: Row = EmptyRow): Unit = {
+ inputRow: InternalRow = EmptyRow): Unit = {
val actual = try evaluate(expression, inputRow) catch {
case e: Exception => fail(s"Exception evaluating $expression", e)
}
@@ -66,7 +66,7 @@ trait ExpressionEvalHelper {
protected def checkEvaluationWithGeneratedMutableProjection(
expression: Expression,
expected: Any,
- inputRow: Row = EmptyRow): Unit = {
+ inputRow: InternalRow = EmptyRow): Unit = {
val plan = try {
GenerateMutableProjection.generate(Alias(expression, s"Optimized($expression)")() :: Nil)()
@@ -92,7 +92,7 @@ trait ExpressionEvalHelper {
protected def checkEvaluationWithGeneratedProjection(
expression: Expression,
expected: Any,
- inputRow: Row = EmptyRow): Unit = {
+ inputRow: InternalRow = EmptyRow): Unit = {
val ctx = GenerateProjection.newCodeGenContext()
lazy val evaluated = expression.gen(ctx)
@@ -128,7 +128,7 @@ trait ExpressionEvalHelper {
protected def checkEvaluationWithOptimization(
expression: Expression,
expected: Any,
- inputRow: Row = EmptyRow): Unit = {
+ inputRow: InternalRow = EmptyRow): Unit = {
val plan = Project(Alias(expression, s"Optimized($expression)")() :: Nil, OneRowRelation)
val optimizedPlan = DefaultOptimizer.execute(plan)
checkEvaluationWithoutCodegen(optimizedPlan.expressions.head, expected, inputRow)
@@ -137,7 +137,7 @@ trait ExpressionEvalHelper {
protected def checkDoubleEvaluation(
expression: Expression,
expected: Spread[Double],
- inputRow: Row = EmptyRow): Unit = {
+ inputRow: InternalRow = EmptyRow): Unit = {
val actual = try evaluate(expression, inputRow) catch {
case e: Exception => fail(s"Exception evaluating $expression", e)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMapSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMapSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMapSuite.scala
index 72bbc4e..7aae2bb 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMapSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMapSuite.scala
@@ -37,7 +37,7 @@ class UnsafeFixedWidthAggregationMapSuite
private val groupKeySchema = StructType(StructField("product", StringType) :: Nil)
private val aggBufferSchema = StructType(StructField("salePrice", IntegerType) :: Nil)
- private def emptyAggregationBuffer: Row = new GenericRow(Array[Any](0))
+ private def emptyAggregationBuffer: InternalRow = new GenericRow(Array[Any](0))
private var memoryManager: TaskMemoryManager = null
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala
index 61722f1..577c7a0 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala
@@ -86,7 +86,7 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
DoubleType)
val converter = new UnsafeRowConverter(fieldTypes)
- val rowWithAllNullColumns: Row = {
+ val rowWithAllNullColumns: InternalRow = {
val r = new SpecificMutableRow(fieldTypes)
for (i <- 0 to fieldTypes.length - 1) {
r.setNullAt(i)
@@ -117,7 +117,7 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
// If we have an UnsafeRow with columns that are initially non-null and we null out those
// columns, then the serialized row representation should be identical to what we would get by
// creating an entirely null row via the converter
- val rowWithNoNullColumns: Row = {
+ val rowWithNoNullColumns: InternalRow = {
val r = new SpecificMutableRow(fieldTypes)
r.setNullAt(0)
r.setBoolean(1, false)
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConvertToLocalRelationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConvertToLocalRelationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConvertToLocalRelationSuite.scala
index 6841bd9..54e8c64 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConvertToLocalRelationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConvertToLocalRelationSuite.scala
@@ -17,10 +17,10 @@
package org.apache.spark.sql.catalyst.optimizer
-import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
-import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
@@ -37,13 +37,11 @@ class ConvertToLocalRelationSuite extends PlanTest {
test("Project on LocalRelation should be turned into a single LocalRelation") {
val testRelation = LocalRelation(
LocalRelation('a.int, 'b.int).output,
- Row(1, 2) ::
- Row(4, 5) :: Nil)
+ InternalRow(1, 2) :: InternalRow(4, 5) :: Nil)
val correctAnswer = LocalRelation(
LocalRelation('a1.int, 'b1.int).output,
- Row(1, 3) ::
- Row(4, 6) :: Nil)
+ InternalRow(1, 3) :: InternalRow(4, 6) :: Nil)
val projectOnLocal = testRelation.select(
UnresolvedAttribute("a").as("a1"),
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
index 8ec79c3..bda2179 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
@@ -28,7 +28,7 @@ case class Dummy(optKey: Option[Expression]) extends Expression {
override def nullable: Boolean = true
override def dataType: NullType = NullType
override lazy val resolved = true
- override def eval(input: Row): Any = null.asInstanceOf[Any]
+ override def eval(input: InternalRow): Any = null.asInstanceOf[Any]
}
case class ComplexPlan(exprs: Seq[Seq[Expression]])
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateUtilsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateUtilsSuite.scala
index a424554..4d8fe4a 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateUtilsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateUtilsSuite.scala
@@ -21,7 +21,6 @@ import java.sql.Timestamp
import org.apache.spark.SparkFunSuite
-
class DateUtilsSuite extends SparkFunSuite {
test("timestamp") {
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index f041fd3..f1acdfe 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql
import java.io.CharArrayWriter
import java.util.Properties
-import scala.collection.JavaConversions._
import scala.language.implicitConversions
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag
@@ -33,7 +32,7 @@ import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.api.python.SerDeUtil
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.analysis.{MultiAlias, ResolvedStar, UnresolvedAttribute, UnresolvedRelation}
+import org.apache.spark.sql.catalyst.analysis.{MultiAlias, ResolvedStar, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.{Filter, _}
import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
@@ -1032,7 +1031,8 @@ class DataFrame private[sql](
val names = schema.toAttributes.map(_.name)
val rowFunction =
- f.andThen(_.map(CatalystTypeConverters.convertToCatalyst(_, schema).asInstanceOf[Row]))
+ f.andThen(_.map(CatalystTypeConverters.convertToCatalyst(_, schema)
+ .asInstanceOf[InternalRow]))
val generator = UserDefinedGenerator(elementTypes, rowFunction, input.map(_.expr))
Generate(generator, join = true, outer = false,
@@ -1058,8 +1058,9 @@ class DataFrame private[sql](
val elementTypes = attributes.map { attr => (attr.dataType, attr.nullable) }
val names = attributes.map(_.name)
- def rowFunction(row: Row): TraversableOnce[Row] = {
- f(row(0).asInstanceOf[A]).map(o => Row(CatalystTypeConverters.convertToCatalyst(o, dataType)))
+ def rowFunction(row: Row): TraversableOnce[InternalRow] = {
+ f(row(0).asInstanceOf[A]).map(o =>
+ catalyst.InternalRow(CatalystTypeConverters.convertToCatalyst(o, dataType)))
}
val generator = UserDefinedGenerator(elementTypes, rowFunction, apply(inputColumn).expr :: Nil)
@@ -1221,7 +1222,7 @@ class DataFrame private[sql](
val outputCols = (if (cols.isEmpty) numericColumns.map(_.prettyString) else cols).toList
- val ret: Seq[Row] = if (outputCols.nonEmpty) {
+ val ret: Seq[InternalRow] = if (outputCols.nonEmpty) {
val aggExprs = statistics.flatMap { case (_, colToAgg) =>
outputCols.map(c => Column(Cast(colToAgg(Column(c).expr), StringType)).as(c))
}
@@ -1230,11 +1231,12 @@ class DataFrame private[sql](
// Pivot the data so each summary is one row
row.grouped(outputCols.size).toSeq.zip(statistics).map {
- case (aggregation, (statistic, _)) => Row(statistic :: aggregation.toList: _*)
+ case (aggregation, (statistic, _)) =>
+ catalyst.InternalRow(statistic :: aggregation.toList: _*)
}
} else {
// If there are no output columns, just output a single column that contains the stats.
- statistics.map { case (name, _) => Row(name) }
+ statistics.map { case (name, _) => catalyst.InternalRow(name) }
}
// All columns are string type
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org