You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Michael Munday (Jira)" <ji...@apache.org> on 2020/09/17 20:44:00 UTC

[jira] [Created] (SPARK-32929) StreamSuite failure on IBM Z: - SPARK-20432: union one stream with itself

Michael Munday created SPARK-32929:
--------------------------------------

             Summary: StreamSuite failure on IBM Z: - SPARK-20432: union one stream with itself
                 Key: SPARK-32929
                 URL: https://issues.apache.org/jira/browse/SPARK-32929
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 3.0.1
         Environment: openjdk version "11.0.8" 2020-07-14
OpenJDK Runtime Environment AdoptOpenJDK (build 11.0.8+10)
OpenJDK 64-Bit Server VM AdoptOpenJDK (build 11.0.8+10, mixed mode)

Linux 4.15.0-117-generic #118-Ubuntu SMP Fri Sep 4 20:00:20 UTC 2020 s390x s390x s390x GNU/Linux

            Reporter: Michael Munday


I am getting zeros in the output of this test on IBM Z. This is a big-endian system. See error below.

I think this issue is related to the use of {{IntegerType}} in the schema for {{FakeDefaultSource}}. Modifying the schema to use {{LongType}} fixes the issue. Another workaround is to remove {{.select("a")}} (see patch below).

My working theory is that long data (longs are generated by Range) is being read using unsafe int operations (as specified in the schema). This would 'work' on little-endian systems but not big-endian systems. I'm still working to figure out what the mechanism is and I'd appreciate any hints or insights.

The error looks like this:
{noformat}
- SPARK-20432: union one stream with itself *** FAILED ***
  Decoded objects do not match expected objects:
  expected: WrappedArray(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
  actual:   WrappedArray(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
  assertnotnull(upcast(getcolumnbyordinal(0, LongType), LongType, - root class: "scala.Long"))
  +- upcast(getcolumnbyordinal(0, LongType), LongType, - root class: "scala.Long")
     +- getcolumnbyordinal(0, LongType) (QueryTest.scala:88)
{noformat}
This change fixes the issue: 
{code:java}
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -45,7 +45,7 @@ import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.StreamSourceProvider
 import org.apache.spark.sql.streaming.util.{BlockOnStopSourceProvider, StreamManualClock}
-import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
+import org.apache.spark.sql.types.{IntegerType, LongType, StructField, StructType}
 import org.apache.spark.util.Utils

 class StreamSuite extends StreamTest {
@@ -1265,7 +1265,7 @@ class StreamSuite extends StreamTest {
 }

 abstract class FakeSource extends StreamSourceProvider {
-  private val fakeSchema = StructType(StructField("a", IntegerType) :: Nil)
+  private val fakeSchema = StructType(StructField("a", LongType) :: Nil)

   override def sourceSchema(
       spark: SQLContext,
@@ -1287,7 +1287,7 @@ class FakeDefaultSource extends FakeSource {
     new Source {
       private var offset = -1L

-      override def schema: StructType = StructType(StructField("a", IntegerType) :: Nil)
+      override def schema: StructType = StructType(StructField("a", LongType) :: Nil)

       override def getOffset: Option[Offset] = {
         if (offset >= 10) {

{code}
Alternatively, this change also fixes the issue:
{code:java}
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -154,7 +154,7 @@ class StreamSuite extends StreamTest {
   }
 
   test("SPARK-20432: union one stream with itself") {
-    val df = spark.readStream.format(classOf[FakeDefaultSource].getName).load().select("a")
+    val df = spark.readStream.format(classOf[FakeDefaultSource].getName).load()
     val unioned = df.union(df)
     withTempDir { outputDir =>
       withTempDir { checkpointDir =>
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org