You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2016/09/27 17:57:23 UTC

spark git commit: [SPARK-17618] Fix invalid comparisons between UnsafeRow and other row formats

Repository: spark
Updated Branches:
  refs/heads/branch-1.6 7aded55e7 -> e2ce0caed


[SPARK-17618] Fix invalid comparisons between UnsafeRow and other row formats

## What changes were proposed in this pull request?

This patch addresses a correctness bug in Spark 1.6.x in where `coalesce()` declares that it can process `UnsafeRows` but mis-declares that it always outputs safe rows. If UnsafeRow and other Row types are compared for equality then we will get spurious `false` comparisons, leading to wrong answers in operators which perform whole-row comparison (such as `distinct()` or `except()`). An example of a query impacted by this bug is given in the [JIRA ticket](https://issues.apache.org/jira/browse/SPARK-17618).

The problem is that the validity of our row format conversion rules depends on operators which handle `unsafeRows` (signalled by overriding `canProcessUnsafeRows`) correctly reporting their output row format (which is done by overriding `outputsUnsafeRows`). In #9024, we overrode `canProcessUnsafeRows` but forgot to override `outputsUnsafeRows`, leading to the incorrect `equals()` comparison.

Our interface design is flawed because correctness depends on operators correctly overriding multiple methods this problem could have been prevented by a design which coupled row format methods / metadata into a single method / class so that all three methods had to be overridden at the same time.

This patch addresses this issue by adding missing `outputsUnsafeRows` overrides. In order to ensure that bugs in this logic are uncovered sooner, I have modified `UnsafeRow.equals()` to throw an `IllegalArgumentException` if it is called with an object that is not an `UnsafeRow`.

## How was this patch tested?

I believe that the stronger misuse-checking in `UnsafeRow.equals()` is sufficient to detect and prevent this class of bug.

Author: Josh Rosen <jo...@databricks.com>

Closes #15185 from JoshRosen/SPARK-17618.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e2ce0cae
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e2ce0cae
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e2ce0cae

Branch: refs/heads/branch-1.6
Commit: e2ce0caed9cc2380cc24d61a0685a4534f21a7f3
Parents: 7aded55
Author: Josh Rosen <jo...@databricks.com>
Authored: Tue Sep 27 10:57:15 2016 -0700
Committer: Herman van Hovell <hv...@databricks.com>
Committed: Tue Sep 27 10:57:15 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/catalyst/expressions/UnsafeRow.java  | 7 ++++++-
 .../main/scala/org/apache/spark/sql/execution/Window.scala    | 1 +
 .../scala/org/apache/spark/sql/execution/basicOperators.scala | 7 +++++--
 3 files changed, 12 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e2ce0cae/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
index af687ea..5555b54 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
@@ -30,6 +30,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 
+import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.types.ArrayType;
 import org.apache.spark.sql.types.BinaryType;
 import org.apache.spark.sql.types.BooleanType;
@@ -610,8 +611,12 @@ public final class UnsafeRow extends MutableRow implements Externalizable, KryoS
       return (sizeInBytes == o.sizeInBytes) &&
         ByteArrayMethods.arrayEquals(baseObject, baseOffset, o.baseObject, o.baseOffset,
           sizeInBytes);
+    } else if (other == null || !(other instanceof InternalRow)) {
+      return false;
+    } else {
+      throw new IllegalArgumentException(
+        "Cannot compare UnsafeRow to " + other.getClass().getName());
     }
-    return false;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/e2ce0cae/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
index b1280c3..b3f1580 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
@@ -96,6 +96,7 @@ case class Window(
   override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
   override def canProcessUnsafeRows: Boolean = true
+  override def outputsUnsafeRows: Boolean = false
 
   /**
    * Create a bound ordering object for a given frame type and offset. A bound ordering object is

http://git-wip-us.apache.org/repos/asf/spark/blob/e2ce0cae/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index a42aea0..58d5669 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -251,6 +251,7 @@ case class Coalesce(numPartitions: Int, child: SparkPlan) extends UnaryNode {
   }
 
   override def canProcessUnsafeRows: Boolean = true
+  override def outputsUnsafeRows: Boolean = child.outputsUnsafeRows
 }
 
 /**
@@ -319,6 +320,7 @@ case class AppendColumns[T, U](
   // We are using an unsafe combiner.
   override def canProcessSafeRows: Boolean = false
   override def canProcessUnsafeRows: Boolean = true
+  override def outputsUnsafeRows: Boolean = true
 
   override def output: Seq[Attribute] = child.output ++ newColumns
 
@@ -326,10 +328,11 @@ case class AppendColumns[T, U](
     child.execute().mapPartitionsInternal { iter =>
       val tBoundEncoder = tEncoder.bind(child.output)
       val combiner = GenerateUnsafeRowJoiner.create(tEncoder.schema, uEncoder.schema)
-      iter.map { row =>
+      val unsafeRows: Iterator[UnsafeRow] = iter.map { row =>
         val newColumns = uEncoder.toRow(func(tBoundEncoder.fromRow(row)))
-        combiner.join(row.asInstanceOf[UnsafeRow], newColumns.asInstanceOf[UnsafeRow]): InternalRow
+        combiner.join(row.asInstanceOf[UnsafeRow], newColumns.asInstanceOf[UnsafeRow])
       }
+      unsafeRows
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org