You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Yang Jie (Jira)" <ji...@apache.org> on 2020/10/10 09:37:00 UTC
[jira] [Commented] (SPARK-32929) StreamSuite failure on IBM Z: -
SPARK-20432: union one stream with itself
[ https://issues.apache.org/jira/browse/SPARK-32929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17211634#comment-17211634 ]
Yang Jie commented on SPARK-32929:
----------------------------------
A little incomprehensible, Java should be able to shield the impact of the BigEndian andLITTLE_ENDIAN
> StreamSuite failure on IBM Z: - SPARK-20432: union one stream with itself
> -------------------------------------------------------------------------
>
> Key: SPARK-32929
> URL: https://issues.apache.org/jira/browse/SPARK-32929
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 3.0.1
> Environment: openjdk version "11.0.8" 2020-07-14
> OpenJDK Runtime Environment AdoptOpenJDK (build 11.0.8+10)
> OpenJDK 64-Bit Server VM AdoptOpenJDK (build 11.0.8+10, mixed mode)
> Linux 4.15.0-117-generic #118-Ubuntu SMP Fri Sep 4 20:00:20 UTC 2020 s390x s390x s390x GNU/Linux
> Reporter: Michael Munday
> Priority: Minor
> Labels: big-endian
>
> I am getting zeros in the output of this test on IBM Z. This is a big-endian system. See error below.
> I think this issue is related to the use of {{IntegerType}} in the schema for {{FakeDefaultSource}}. Modifying the schema to use {{LongType}} fixes the issue. Another workaround is to remove {{.select("a")}} (see patch below).
> My working theory is that long data (longs are generated by Range) is being read using unsafe int operations (as specified in the schema). This would 'work' on little-endian systems but not big-endian systems. I'm still working to figure out what the mechanism is and I'd appreciate any hints or insights.
> The error looks like this:
> {noformat}
> - SPARK-20432: union one stream with itself *** FAILED ***
> Decoded objects do not match expected objects:
> expected: WrappedArray(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
> actual: WrappedArray(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
> assertnotnull(upcast(getcolumnbyordinal(0, LongType), LongType, - root class: "scala.Long"))
> +- upcast(getcolumnbyordinal(0, LongType), LongType, - root class: "scala.Long")
> +- getcolumnbyordinal(0, LongType) (QueryTest.scala:88)
> {noformat}
> This change fixes the issue:
> {code:java}
> --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
> +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
> @@ -45,7 +45,7 @@ import org.apache.spark.sql.functions._
> import org.apache.spark.sql.internal.SQLConf
> import org.apache.spark.sql.sources.StreamSourceProvider
> import org.apache.spark.sql.streaming.util.{BlockOnStopSourceProvider, StreamManualClock}
> -import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
> +import org.apache.spark.sql.types.{IntegerType, LongType, StructField, StructType}
> import org.apache.spark.util.Utils
> class StreamSuite extends StreamTest {
> @@ -1265,7 +1265,7 @@ class StreamSuite extends StreamTest {
> }
> abstract class FakeSource extends StreamSourceProvider {
> - private val fakeSchema = StructType(StructField("a", IntegerType) :: Nil)
> + private val fakeSchema = StructType(StructField("a", LongType) :: Nil)
> override def sourceSchema(
> spark: SQLContext,
> @@ -1287,7 +1287,7 @@ class FakeDefaultSource extends FakeSource {
> new Source {
> private var offset = -1L
> - override def schema: StructType = StructType(StructField("a", IntegerType) :: Nil)
> + override def schema: StructType = StructType(StructField("a", LongType) :: Nil)
> override def getOffset: Option[Offset] = {
> if (offset >= 10) {
> {code}
> Alternatively, this change also fixes the issue:
> {code:java}
> --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
> +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
> @@ -154,7 +154,7 @@ class StreamSuite extends StreamTest {
> }
>
> test("SPARK-20432: union one stream with itself") {
> - val df = spark.readStream.format(classOf[FakeDefaultSource].getName).load().select("a")
> + val df = spark.readStream.format(classOf[FakeDefaultSource].getName).load()
> val unioned = df.union(df)
> withTempDir { outputDir =>
> withTempDir { checkpointDir =>
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org