You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by da...@apache.org on 2015/12/16 19:22:56 UTC

spark git commit: [SPARK-8745] [SQL] remove GenerateProjection

Repository: spark
Updated Branches:
  refs/heads/master a6325fc40 -> 54c512ba9


[SPARK-8745] [SQL] remove GenerateProjection

cc rxin

Author: Davies Liu <da...@databricks.com>

Closes #10316 from davies/remove_generate_projection.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/54c512ba
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/54c512ba
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/54c512ba

Branch: refs/heads/master
Commit: 54c512ba906edfc25b8081ad67498e99d884452b
Parents: a6325fc
Author: Davies Liu <da...@databricks.com>
Authored: Wed Dec 16 10:22:48 2015 -0800
Committer: Davies Liu <da...@gmail.com>
Committed: Wed Dec 16 10:22:48 2015 -0800

----------------------------------------------------------------------
 .../codegen/GenerateProjection.scala            | 238 -------------------
 .../codegen/GenerateSafeProjection.scala        |   4 +
 .../expressions/CodeGenerationSuite.scala       |   5 +-
 .../expressions/ExpressionEvalHelper.scala      |  39 +--
 .../expressions/MathFunctionsSuite.scala        |   4 +-
 .../codegen/CodegenExpressionCachingSuite.scala |  18 --
 .../spark/sql/execution/local/ExpandNode.scala  |   4 +-
 .../spark/sql/execution/local/LocalNode.scala   |  18 --
 8 files changed, 11 insertions(+), 319 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/54c512ba/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala
deleted file mode 100644
index f229f20..0000000
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala
+++ /dev/null
@@ -1,238 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.catalyst.expressions.codegen
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.types._
-
-/**
- * Java can not access Projection (in package object)
- */
-abstract class BaseProjection extends Projection {}
-
-abstract class CodeGenMutableRow extends MutableRow with BaseGenericInternalRow
-
-/**
- * Generates bytecode that produces a new [[InternalRow]] object based on a fixed set of input
- * [[Expression Expressions]] and a given input [[InternalRow]].  The returned [[InternalRow]]
- * object is custom generated based on the output types of the [[Expression]] to avoid boxing of
- * primitive values.
- */
-object GenerateProjection extends CodeGenerator[Seq[Expression], Projection] {
-
-  protected def canonicalize(in: Seq[Expression]): Seq[Expression] =
-    in.map(ExpressionCanonicalizer.execute)
-
-  protected def bind(in: Seq[Expression], inputSchema: Seq[Attribute]): Seq[Expression] =
-    in.map(BindReferences.bindReference(_, inputSchema))
-
-  // Make Mutablility optional...
-  protected def create(expressions: Seq[Expression]): Projection = {
-    val ctx = newCodeGenContext()
-    val columns = expressions.zipWithIndex.map {
-      case (e, i) =>
-        s"private ${ctx.javaType(e.dataType)} c$i = ${ctx.defaultValue(e.dataType)};\n"
-    }.mkString("\n")
-
-    val initColumns = expressions.zipWithIndex.map {
-      case (e, i) =>
-        val eval = e.gen(ctx)
-        s"""
-        {
-          // column$i
-          ${eval.code}
-          nullBits[$i] = ${eval.isNull};
-          if (!${eval.isNull}) {
-            c$i = ${eval.value};
-          }
-        }
-        """
-    }.mkString("\n")
-
-    val getCases = (0 until expressions.size).map { i =>
-      s"case $i: return c$i;"
-    }.mkString("\n")
-
-    val updateCases = expressions.zipWithIndex.map { case (e, i) =>
-      s"case $i: { c$i = (${ctx.boxedType(e.dataType)})value; return;}"
-    }.mkString("\n")
-
-    val specificAccessorFunctions = ctx.primitiveTypes.map { jt =>
-      val cases = expressions.zipWithIndex.flatMap {
-        case (e, i) if ctx.javaType(e.dataType) == jt =>
-          Some(s"case $i: return c$i;")
-        case _ => None
-      }.mkString("\n")
-      if (cases.length > 0) {
-        val getter = "get" + ctx.primitiveTypeName(jt)
-        s"""
-      public $jt $getter(int i) {
-        if (isNullAt(i)) {
-          return ${ctx.defaultValue(jt)};
-        }
-        switch (i) {
-        $cases
-        }
-        throw new IllegalArgumentException("Invalid index: " + i
-          + " in $getter");
-      }"""
-      } else {
-        ""
-      }
-    }.filter(_.length > 0).mkString("\n")
-
-    val specificMutatorFunctions = ctx.primitiveTypes.map { jt =>
-      val cases = expressions.zipWithIndex.flatMap {
-        case (e, i) if ctx.javaType(e.dataType) == jt =>
-          Some(s"case $i: { c$i = value; return; }")
-        case _ => None
-      }.mkString("\n")
-      if (cases.length > 0) {
-        val setter = "set" + ctx.primitiveTypeName(jt)
-        s"""
-      public void $setter(int i, $jt value) {
-        nullBits[i] = false;
-        switch (i) {
-        $cases
-        }
-        throw new IllegalArgumentException("Invalid index: " + i +
-          " in $setter}");
-      }"""
-      } else {
-        ""
-      }
-    }.filter(_.length > 0).mkString("\n")
-
-    val hashValues = expressions.zipWithIndex.map { case (e, i) =>
-      val col = s"c$i"
-      val nonNull = e.dataType match {
-        case BooleanType => s"$col ? 0 : 1"
-        case ByteType | ShortType | IntegerType | DateType => s"$col"
-        case LongType | TimestampType => s"$col ^ ($col >>> 32)"
-        case FloatType => s"Float.floatToIntBits($col)"
-        case DoubleType =>
-            s"(int)(Double.doubleToLongBits($col) ^ (Double.doubleToLongBits($col) >>> 32))"
-        case BinaryType => s"java.util.Arrays.hashCode($col)"
-        case _ => s"$col.hashCode()"
-      }
-      s"isNullAt($i) ? 0 : ($nonNull)"
-    }
-
-    val hashUpdates: String = hashValues.map( v =>
-      s"""
-        result *= 37; result += $v;"""
-    ).mkString("\n")
-
-    val columnChecks = expressions.zipWithIndex.map { case (e, i) =>
-      s"""
-        if (nullBits[$i] != row.nullBits[$i] ||
-          (!nullBits[$i] && !(${ctx.genEqual(e.dataType, s"c$i", s"row.c$i")}))) {
-          return false;
-        }
-      """
-    }.mkString("\n")
-
-    val copyColumns = expressions.zipWithIndex.map { case (e, i) =>
-        s"""if (!nullBits[$i]) arr[$i] = c$i;"""
-    }.mkString("\n")
-
-    val code = s"""
-    public SpecificProjection generate($exprType[] expr) {
-      return new SpecificProjection(expr);
-    }
-
-    class SpecificProjection extends ${classOf[BaseProjection].getName} {
-      private $exprType[] expressions;
-      ${declareMutableStates(ctx)}
-      ${declareAddedFunctions(ctx)}
-
-      public SpecificProjection($exprType[] expr) {
-        expressions = expr;
-        ${initMutableStates(ctx)}
-      }
-
-      public java.lang.Object apply(java.lang.Object r) {
-        // GenerateProjection does not work with UnsafeRows.
-        assert(!(r instanceof ${classOf[UnsafeRow].getName}));
-        return new SpecificRow((InternalRow) r);
-      }
-
-      final class SpecificRow extends ${classOf[CodeGenMutableRow].getName} {
-
-        $columns
-
-        public SpecificRow(InternalRow ${ctx.INPUT_ROW}) {
-          $initColumns
-        }
-
-        public int numFields() { return ${expressions.length};}
-        protected boolean[] nullBits = new boolean[${expressions.length}];
-        public void setNullAt(int i) { nullBits[i] = true; }
-        public boolean isNullAt(int i) { return nullBits[i]; }
-
-        public java.lang.Object genericGet(int i) {
-          if (isNullAt(i)) return null;
-          switch (i) {
-          $getCases
-          }
-          return null;
-        }
-        public void update(int i, java.lang.Object value) {
-          if (value == null) {
-            setNullAt(i);
-            return;
-          }
-          nullBits[i] = false;
-          switch (i) {
-          $updateCases
-          }
-        }
-        $specificAccessorFunctions
-        $specificMutatorFunctions
-
-        public int hashCode() {
-          int result = 37;
-          $hashUpdates
-          return result;
-        }
-
-        public boolean equals(java.lang.Object other) {
-          if (other instanceof SpecificRow) {
-            SpecificRow row = (SpecificRow) other;
-            $columnChecks
-            return true;
-          }
-          return super.equals(other);
-        }
-
-        public InternalRow copy() {
-          java.lang.Object[] arr = new java.lang.Object[${expressions.length}];
-          ${copyColumns}
-          return new ${classOf[GenericInternalRow].getName}(arr);
-        }
-      }
-    }
-    """
-
-    logDebug(s"MutableRow, initExprs: ${expressions.mkString(",")} code:\n" +
-      CodeFormatter.format(code))
-
-    compile(code).generate(ctx.references.toArray).asInstanceOf[Projection]
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/54c512ba/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala
index b7926bd..13634b6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala
@@ -22,6 +22,10 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.NoOp
 import org.apache.spark.sql.catalyst.util.{GenericArrayData, ArrayBasedMapData}
 import org.apache.spark.sql.types._
 
+/**
+ * Java can not access Projection (in package object)
+ */
+abstract class BaseProjection extends Projection {}
 
 /**
  * Generates byte code that produces a [[MutableRow]] object (not an [[UnsafeRow]]) that can update

http://git-wip-us.apache.org/repos/asf/spark/blob/54c512ba/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
index cd2ef7d..0c42e2f 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
@@ -18,8 +18,8 @@
 package org.apache.spark.sql.catalyst.expressions
 
 import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.{Row, RandomDataGenerator}
-import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.expressions.codegen._
 import org.apache.spark.sql.types._
@@ -38,7 +38,6 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {
     val futures = (1 to 20).map { _ =>
       future {
         GeneratePredicate.generate(EqualTo(Literal(1), Literal(1)))
-        GenerateProjection.generate(EqualTo(Literal(1), Literal(1)) :: Nil)
         GenerateMutableProjection.generate(EqualTo(Literal(1), Literal(1)) :: Nil)
         GenerateOrdering.generate(Add(Literal(1), Literal(1)).asc :: Nil)
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/54c512ba/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
index 465f7d0..f869a96 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
@@ -43,7 +43,6 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks {
     val catalystValue = CatalystTypeConverters.convertToCatalyst(expected)
     checkEvaluationWithoutCodegen(expression, catalystValue, inputRow)
     checkEvaluationWithGeneratedMutableProjection(expression, catalystValue, inputRow)
-    checkEvaluationWithGeneratedProjection(expression, catalystValue, inputRow)
     if (GenerateUnsafeProjection.canSupport(expression.dataType)) {
       checkEvalutionWithUnsafeProjection(expression, catalystValue, inputRow)
     }
@@ -120,42 +119,6 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks {
     }
   }
 
-  protected def checkEvaluationWithGeneratedProjection(
-      expression: Expression,
-      expected: Any,
-      inputRow: InternalRow = EmptyRow): Unit = {
-
-    val plan = generateProject(
-      GenerateProjection.generate(Alias(expression, s"Optimized($expression)")() :: Nil),
-      expression)
-
-    val actual = plan(inputRow)
-    val expectedRow = InternalRow(expected)
-
-    // We reimplement hashCode in generated `SpecificRow`, make sure it's consistent with our
-    // interpreted version.
-    if (actual.hashCode() != expectedRow.hashCode()) {
-      val ctx = new CodeGenContext
-      val evaluated = expression.gen(ctx)
-      fail(
-        s"""
-          |Mismatched hashCodes for values: $actual, $expectedRow
-          |Hash Codes: ${actual.hashCode()} != ${expectedRow.hashCode()}
-          |Expressions: $expression
-          |Code: $evaluated
-        """.stripMargin)
-    }
-
-    if (actual != expectedRow) {
-      val input = if (inputRow == EmptyRow) "" else s", input: $inputRow"
-      fail("Incorrect Evaluation in codegen mode: " +
-        s"$expression, actual: $actual, expected: $expectedRow$input")
-    }
-    if (actual.copy() != expectedRow) {
-      fail(s"Copy of generated Row is wrong: actual: ${actual.copy()}, expected: $expectedRow")
-    }
-  }
-
   protected def checkEvalutionWithUnsafeProjection(
       expression: Expression,
       expected: Any,
@@ -202,7 +165,7 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks {
     checkEvaluationWithOptimization(expression, expected)
 
     var plan = generateProject(
-      GenerateProjection.generate(Alias(expression, s"Optimized($expression)")() :: Nil),
+      GenerateMutableProjection.generate(Alias(expression, s"Optimized($expression)")() :: Nil)(),
       expression)
     var actual = plan(inputRow).get(0, expression.dataType)
     assert(checkResult(actual, expected))

http://git-wip-us.apache.org/repos/asf/spark/blob/54c512ba/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala
index 88ed9fd..4ad65db 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala
@@ -20,9 +20,9 @@ package org.apache.spark.sql.catalyst.expressions
 import com.google.common.math.LongMath
 
 import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.catalyst.dsl.expressions._
-import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateProjection, GenerateMutableProjection}
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
 import org.apache.spark.sql.catalyst.optimizer.DefaultOptimizer
 import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Project}
 import org.apache.spark.sql.types._

http://git-wip-us.apache.org/repos/asf/spark/blob/54c512ba/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodegenExpressionCachingSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodegenExpressionCachingSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodegenExpressionCachingSuite.scala
index 2d3f98d..c9616cd 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodegenExpressionCachingSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodegenExpressionCachingSuite.scala
@@ -34,12 +34,6 @@ class CodegenExpressionCachingSuite extends SparkFunSuite {
     assert(instance.apply(null).getBoolean(0) === false)
   }
 
-  test("GenerateProjection should initialize expressions") {
-    val expr = And(NondeterministicExpression(), NondeterministicExpression())
-    val instance = GenerateProjection.generate(Seq(expr))
-    assert(instance.apply(null).getBoolean(0) === false)
-  }
-
   test("GenerateMutableProjection should initialize expressions") {
     val expr = And(NondeterministicExpression(), NondeterministicExpression())
     val instance = GenerateMutableProjection.generate(Seq(expr))()
@@ -64,18 +58,6 @@ class CodegenExpressionCachingSuite extends SparkFunSuite {
     assert(instance2.apply(null).getBoolean(0) === true)
   }
 
-  test("GenerateProjection should not share expression instances") {
-    val expr1 = MutableExpression()
-    val instance1 = GenerateProjection.generate(Seq(expr1))
-    assert(instance1.apply(null).getBoolean(0) === false)
-
-    val expr2 = MutableExpression()
-    expr2.mutableState = true
-    val instance2 = GenerateProjection.generate(Seq(expr2))
-    assert(instance1.apply(null).getBoolean(0) === false)
-    assert(instance2.apply(null).getBoolean(0) === true)
-  }
-
   test("GenerateMutableProjection should not share expression instances") {
     val expr1 = MutableExpression()
     val instance1 = GenerateMutableProjection.generate(Seq(expr1))()

http://git-wip-us.apache.org/repos/asf/spark/blob/54c512ba/sql/core/src/main/scala/org/apache/spark/sql/execution/local/ExpandNode.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/ExpandNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/ExpandNode.scala
index 2aff156..85111bd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/ExpandNode.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/ExpandNode.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.local
 
 import org.apache.spark.sql.SQLConf
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Projection}
+import org.apache.spark.sql.catalyst.expressions._
 
 case class ExpandNode(
     conf: SQLConf,
@@ -36,7 +36,7 @@ case class ExpandNode(
 
   override def open(): Unit = {
     child.open()
-    groups = projections.map(ee => newProjection(ee, child.output)).toArray
+    groups = projections.map(ee => newMutableProjection(ee, child.output)()).toArray
     idx = groups.length
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/54c512ba/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala
index d3381ea..6a882c9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala
@@ -103,24 +103,6 @@ abstract class LocalNode(conf: SQLConf) extends QueryPlan[LocalNode] with Loggin
     result
   }
 
-  protected def newProjection(
-      expressions: Seq[Expression],
-      inputSchema: Seq[Attribute]): Projection = {
-    log.debug(
-      s"Creating Projection: $expressions, inputSchema: $inputSchema")
-    try {
-      GenerateProjection.generate(expressions, inputSchema)
-    } catch {
-      case NonFatal(e) =>
-        if (isTesting) {
-          throw e
-        } else {
-          log.error("Failed to generate projection, fallback to interpret", e)
-          new InterpretedProjection(expressions, inputSchema)
-        }
-    }
-  }
-
   protected def newMutableProjection(
       expressions: Seq[Expression],
       inputSchema: Seq[Attribute]): () => MutableProjection = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org