You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by da...@apache.org on 2015/07/09 18:20:27 UTC

spark git commit: [SPARK-8931] [SQL] Fallback to interpreted evaluation if failed to compile in codegen

Repository: spark
Updated Branches:
  refs/heads/master f88b12537 -> 23448a9e9


[SPARK-8931] [SQL] Fallback to interpreted evaluation if failed to compile in codegen

Exception will not be catched during tests.

cc marmbrus rxin

Author: Davies Liu <da...@databricks.com>

Closes #7309 from davies/fallback and squashes the following commits:

969a612 [Davies Liu] throw exception during tests
f844f77 [Davies Liu] fallback
a3091bc [Davies Liu] Merge branch 'master' of github.com:apache/spark into fallback
364a0d6 [Davies Liu] fallback to interpret mode if failed to compile


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/23448a9e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/23448a9e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/23448a9e

Branch: refs/heads/master
Commit: 23448a9e988a1b92bd05ee8c6c1a096c83375a12
Parents: f88b125
Author: Davies Liu <da...@databricks.com>
Authored: Thu Jul 9 09:20:16 2015 -0700
Committer: Davies Liu <da...@gmail.com>
Committed: Thu Jul 9 09:20:16 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/execution/SparkPlan.scala  | 51 ++++++++++++++++++--
 .../org/apache/spark/sql/sources/commands.scala | 13 ++++-
 2 files changed, 58 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/23448a9e/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index ca53186..4d7d862 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -153,12 +153,24 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
     buf.toArray.map(converter(_).asInstanceOf[Row])
   }
 
+  private[this] def isTesting: Boolean = sys.props.contains("spark.testing")
+
   protected def newProjection(
       expressions: Seq[Expression], inputSchema: Seq[Attribute]): Projection = {
     log.debug(
       s"Creating Projection: $expressions, inputSchema: $inputSchema, codegen:$codegenEnabled")
     if (codegenEnabled) {
-      GenerateProjection.generate(expressions, inputSchema)
+      try {
+        GenerateProjection.generate(expressions, inputSchema)
+      } catch {
+        case e: Exception =>
+          if (isTesting) {
+            throw e
+          } else {
+            log.error("Failed to generate projection, fallback to interpret", e)
+            new InterpretedProjection(expressions, inputSchema)
+          }
+      }
     } else {
       new InterpretedProjection(expressions, inputSchema)
     }
@@ -170,17 +182,36 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
     log.debug(
       s"Creating MutableProj: $expressions, inputSchema: $inputSchema, codegen:$codegenEnabled")
     if(codegenEnabled) {
-      GenerateMutableProjection.generate(expressions, inputSchema)
+      try {
+        GenerateMutableProjection.generate(expressions, inputSchema)
+      } catch {
+        case e: Exception =>
+          if (isTesting) {
+            throw e
+          } else {
+            log.error("Failed to generate mutable projection, fallback to interpreted", e)
+            () => new InterpretedMutableProjection(expressions, inputSchema)
+          }
+      }
     } else {
       () => new InterpretedMutableProjection(expressions, inputSchema)
     }
   }
 
-
   protected def newPredicate(
       expression: Expression, inputSchema: Seq[Attribute]): (InternalRow) => Boolean = {
     if (codegenEnabled) {
-      GeneratePredicate.generate(expression, inputSchema)
+      try {
+        GeneratePredicate.generate(expression, inputSchema)
+      } catch {
+        case e: Exception =>
+          if (isTesting) {
+            throw e
+          } else {
+            log.error("Failed to generate predicate, fallback to interpreted", e)
+            InterpretedPredicate.create(expression, inputSchema)
+          }
+      }
     } else {
       InterpretedPredicate.create(expression, inputSchema)
     }
@@ -190,7 +221,17 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
       order: Seq[SortOrder],
       inputSchema: Seq[Attribute]): Ordering[InternalRow] = {
     if (codegenEnabled) {
-      GenerateOrdering.generate(order, inputSchema)
+      try {
+        GenerateOrdering.generate(order, inputSchema)
+      } catch {
+        case e: Exception =>
+          if (isTesting) {
+            throw e
+          } else {
+            log.error("Failed to generate ordering, fallback to interpreted", e)
+            new RowOrdering(order, inputSchema)
+          }
+      }
     } else {
       new RowOrdering(order, inputSchema)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/23448a9e/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
index ecbc889..9189d17 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -276,7 +276,18 @@ private[sql] case class InsertIntoHadoopFsRelation(
     log.debug(
       s"Creating Projection: $expressions, inputSchema: $inputSchema, codegen:$codegenEnabled")
     if (codegenEnabled) {
-      GenerateProjection.generate(expressions, inputSchema)
+
+      try {
+        GenerateProjection.generate(expressions, inputSchema)
+      } catch {
+        case e: Exception =>
+          if (sys.props.contains("spark.testing")) {
+            throw e
+          } else {
+            log.error("failed to generate projection, fallback to interpreted", e)
+            new InterpretedProjection(expressions, inputSchema)
+          }
+      }
     } else {
       new InterpretedProjection(expressions, inputSchema)
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org