You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Joseph K. Bradley (JIRA)" <ji...@apache.org> on 2018/04/02 18:40:00 UTC
[jira] [Updated] (SPARK-23848) Structured Streaming fails with
nested UDTs
[ https://issues.apache.org/jira/browse/SPARK-23848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Joseph K. Bradley updated SPARK-23848:
--------------------------------------
Description:
While trying to write a test for org.apache.spark.ml.feature.MinHashLSHModel with Structured Streaming (for prediction in a streaming job), I ran into a bug which seems to indicate that nested UDTs don't work with streaming.
Here's a simplified version of the code:
{code}
package org.apache.spark.ml.feature
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.sql.{DataFrame, Dataset, Encoder, Row}
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.streaming.StreamTest
class MinHashLSHSuite extends StreamTest {
@transient var dataset: Dataset[_] = _
override def beforeAll(): Unit = {
super.beforeAll()
val data = {
for (i <- 0 to 95) yield Vectors.sparse(100, (i until i + 5).map((_, 1.0)))
}
dataset = spark.createDataFrame(data.map(Tuple1.apply)).toDF("keys")
}
test("a test") {
val localSpark = spark
import localSpark.implicits._
val df = Seq[(Int, Array[Vector])](
(1, Array(Vectors.dense(1.0, 2.0))),
(2, Array(Vectors.dense(1.1, 2.1)))
).toDF("a", "b")
df.show() // THIS SUCCEEDS
df.collect().foreach(println) // THIS SUCCEEDS
testTransformerOnStreamData[(Int, Array[Vector])](df) { rows => // THIS FAILS
rows.foreach {
case Row(a: Int, b: Array[_]) =>
}
}
}
def testTransformerOnStreamData[A : Encoder](
dataframe: DataFrame)
(globalCheckFunction: Seq[Row] => Unit): Unit = {
val stream = MemoryStream[A]
val streamDF = stream.toDS().toDF("a", "b")
val data = dataframe.as[A].collect()
val streamOutput = streamDF
.select("a", "b")
testStream(streamOutput) (
AddData(stream, data: _*),
CheckAnswer(globalCheckFunction)
)
}
}
{code}
The streaming test fails with stack trace:
{code}
[info] - a test *** FAILED *** (2 seconds, 325 milliseconds)
[info] scala.MatchError: [1,WrappedArray([1.0,2.0])] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
[info]
[info] == Progress ==
[info] AddData to MemoryStream[_1#24,_2#25]: (1,[Lorg.apache.spark.ml.linalg.Vector;@5abf84a9),(2,[Lorg.apache.spark.ml.linalg.Vector;@4b4198ba)
[info] => CheckAnswerByFunc
[info]
[info] == Stream ==
[info] Output Mode: Append
[info] Stream state: {MemoryStream[_1#24,_2#25]: 0}
[info] Thread state: alive
[info] Thread stack trace: java.lang.Thread.sleep(Native Method)
[info] org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:163)
[info] org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
[info] org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:131)
[info] org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
[info] org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
[info]
[info]
[info] == Sink ==
[info] 0: [1,WrappedArray([1.0,2.0])] [2,WrappedArray([1.1,2.1])]
[info]
[info]
[info] == Plan ==
[info] == Parsed Logical Plan ==
[info] Project [a#27, b#28]
[info] +- Project [_1#24 AS a#27, _2#25 AS b#28]
[info] +- Project [_1#36 AS _1#24, _2#37 AS _2#25]
[info] +- Streaming RelationV2 MemoryStreamDataSource[_1#36, _2#37]
[info]
[info] == Analyzed Logical Plan ==
[info] a: int, b: array<vector>
[info] Project [a#27, b#28]
[info] +- Project [_1#24 AS a#27, _2#25 AS b#28]
[info] +- Project [_1#36 AS _1#24, _2#37 AS _2#25]
[info] +- Streaming RelationV2 MemoryStreamDataSource[_1#36, _2#37]
[info]
[info] == Optimized Logical Plan ==
[info] Project [_1#36 AS a#27, _2#37 AS b#28]
[info] +- Streaming RelationV2 MemoryStreamDataSource[_1#36, _2#37]
[info]
[info] == Physical Plan ==
[info] *(1) Project [_1#36 AS a#27, _2#37 AS b#28]
[info] +- *(1) ScanV2 MemoryStreamDataSource[_1#36, _2#37] (StreamTest.scala:430)
[info] org.scalatest.exceptions.TestFailedException:
[info] at org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:528)
[info] at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1560)
[info] at org.scalatest.Assertions$class.fail(Assertions.scala:1089)
[info] at org.scalatest.FunSuite.fail(FunSuite.scala:1560)
[info] at org.apache.spark.sql.streaming.StreamTest$class.failTest$1(StreamTest.scala:430)
[info] at org.apache.spark.sql.streaming.StreamTest$class.executeAction$1(StreamTest.scala:683)
[info] at org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1.apply(StreamTest.scala:704)
[info] at org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1.apply(StreamTest.scala:693)
[info] at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
[info] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
[info] at org.apache.spark.sql.streaming.StreamTest$class.liftedTree1$1(StreamTest.scala:693)
[info] at org.apache.spark.sql.streaming.StreamTest$class.testStream(StreamTest.scala:692)
[info] at org.apache.spark.ml.feature.MinHashLSHSuite.testStream(MinHashLSHSuite.scala:28)
[info] at org.apache.spark.ml.feature.MinHashLSHSuite.testTransformerOnStreamData(MinHashLSHSuite.scala:201)
[info] at org.apache.spark.ml.feature.MinHashLSHSuite$$anonfun$1.apply$mcV$sp(MinHashLSHSuite.scala:184)
[info] at org.apache.spark.ml.feature.MinHashLSHSuite$$anonfun$1.apply(MinHashLSHSuite.scala:174)
[info] at org.apache.spark.ml.feature.MinHashLSHSuite$$anonfun$1.apply(MinHashLSHSuite.scala:174)
[info] at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
[info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info] at org.scalatest.Transformer.apply(Transformer.scala:22)
[info] at org.scalatest.Transformer.apply(Transformer.scala:20)
[info] at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
[info] at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:103)
[info] at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:183)
[info] at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
[info] at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
[info] at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
[info] at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:196)
[info] at org.apache.spark.ml.feature.MinHashLSHSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(MinHashLSHSuite.scala:28)
[info] at org.scalatest.BeforeAndAfterEach$class.runTest(BeforeAndAfterEach.scala:221)
[info] at org.apache.spark.ml.feature.MinHashLSHSuite.runTest(MinHashLSHSuite.scala:28)
[info] at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
[info] at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
[info] at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396)
[info] at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)
[info] at scala.collection.immutable.List.foreach(List.scala:381)
[info] at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
[info] at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379)
[info] at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
[info] at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:229)
[info] at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)
[info] at org.scalatest.Suite$class.run(Suite.scala:1147)
[info] at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
[info] at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
[info] at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
[info] at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
[info] at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:233)
[info] at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:52)
[info] at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:213)
[info] at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:210)
[info] at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:52)
[info] at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:314)
[info] at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:480)
[info] at sbt.ForkMain$Run$2.call(ForkMain.java:296)
[info] at sbt.ForkMain$Run$2.call(ForkMain.java:286)
[info] at java.util.concurrent.FutureTask.run(FutureTask.java:266)
[info] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
[info] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
[info] at java.lang.Thread.run(Thread.java:745)
{code}
was:
While trying to write a test for org.apache.spark.ml.feature.MinHashLSHModel with Structured Streaming (for prediction in a streaming job), I ran into a bug which seems to indicate that nested UDTs don't work with streaming.
Here's a simplified version of the code:
{code}
package org.apache.spark.ml.feature
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.sql.{DataFrame, Dataset, Encoder, Row}
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.streaming.StreamTest
class MinHashLSHSuite extends StreamTest {
@transient var dataset: Dataset[_] = _
override def beforeAll(): Unit = {
super.beforeAll()
val data = {
for (i <- 0 to 95) yield Vectors.sparse(100, (i until i + 5).map((_, 1.0)))
}
dataset = spark.createDataFrame(data.map(Tuple1.apply)).toDF("keys")
}
test("a test") {
val localSpark = spark
import localSpark.implicits._
val df = Seq[(Int, Array[Vector])](
(1, Array(Vectors.dense(1.0, 2.0))),
(2, Array(Vectors.dense(1.1, 2.1)))
).toDF("a", "b")
df.show() // THIS SUCCEEDS
df.collect().foreach(println) // THIS SUCCEEDS
testTransformerOnStreamData[(Int, Array[Vector])](df) { // THIS FAILS
case Row(a: Int, b: Array[_]) =>
}
}
def testTransformerOnStreamData[A : Encoder](
dataframe: DataFrame)
(globalCheckFunction: Seq[Row] => Unit): Unit = {
val stream = MemoryStream[A]
val streamDF = stream.toDS().toDF("a", "b")
val data = dataframe.as[A].collect()
val streamOutput = streamDF
.select("a", "b")
testStream(streamOutput) (
AddData(stream, data: _*),
CheckAnswer(globalCheckFunction)
)
}
}
{code}
The streaming test fails with stack trace:
{code}
[info] - a test *** FAILED *** (2 seconds, 174 milliseconds)
[info] scala.MatchError: ArrayBuffer([1,WrappedArray([1.0,2.0])], [2,WrappedArray([1.1,2.1])]) (of class scala.collection.mutable.ArrayBuffer)
[info]
[info] == Progress ==
[info] AddData to MemoryStream[_1#24,_2#25]: (1,[Lorg.apache.spark.ml.linalg.Vector;@1833a395),(2,[Lorg.apache.spark.ml.linalg.Vector;@72cb7c8f)
[info] => CheckAnswerByFunc
[info]
[info] == Stream ==
[info] Output Mode: Append
[info] Stream state: {MemoryStream[_1#24,_2#25]: 0}
[info] Thread state: alive
[info] Thread stack trace: java.lang.Thread.sleep(Native Method)
[info] org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:163)
[info] org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
[info] org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:131)
[info] org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
[info] org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
[info]
[info]
[info] == Sink ==
[info] 0: [1,WrappedArray([1.0,2.0])] [2,WrappedArray([1.1,2.1])]
[info]
[info]
[info] == Plan ==
[info] == Parsed Logical Plan ==
[info] Project [a#27, b#28]
[info] +- Project [_1#24 AS a#27, _2#25 AS b#28]
[info] +- Project [_1#36 AS _1#24, _2#37 AS _2#25]
[info] +- Streaming RelationV2 MemoryStreamDataSource[_1#36, _2#37]
[info]
[info] == Analyzed Logical Plan ==
[info] a: int, b: array<vector>
[info] Project [a#27, b#28]
[info] +- Project [_1#24 AS a#27, _2#25 AS b#28]
[info] +- Project [_1#36 AS _1#24, _2#37 AS _2#25]
[info] +- Streaming RelationV2 MemoryStreamDataSource[_1#36, _2#37]
[info]
[info] == Optimized Logical Plan ==
[info] Project [_1#36 AS a#27, _2#37 AS b#28]
[info] +- Streaming RelationV2 MemoryStreamDataSource[_1#36, _2#37]
[info]
[info] == Physical Plan ==
[info] *(1) Project [_1#36 AS a#27, _2#37 AS b#28]
[info] +- *(1) ScanV2 MemoryStreamDataSource[_1#36, _2#37] (StreamTest.scala:430)
[info] org.scalatest.exceptions.TestFailedException:
[info] at org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:528)
[info] at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1560)
[info] at org.scalatest.Assertions$class.fail(Assertions.scala:1089)
[info] at org.scalatest.FunSuite.fail(FunSuite.scala:1560)
[info] at org.apache.spark.sql.streaming.StreamTest$class.failTest$1(StreamTest.scala:430)
[info] at org.apache.spark.sql.streaming.StreamTest$class.executeAction$1(StreamTest.scala:683)
[info] at org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1.apply(StreamTest.scala:704)
[info] at org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1.apply(StreamTest.scala:693)
[info] at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
[info] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
[info] at org.apache.spark.sql.streaming.StreamTest$class.liftedTree1$1(StreamTest.scala:693)
[info] at org.apache.spark.sql.streaming.StreamTest$class.testStream(StreamTest.scala:692)
[info] at org.apache.spark.ml.feature.MinHashLSHSuite.testStream(MinHashLSHSuite.scala:28)
[info] at org.apache.spark.ml.feature.MinHashLSHSuite.testTransformerOnStreamData(MinHashLSHSuite.scala:199)
[info] at org.apache.spark.ml.feature.MinHashLSHSuite$$anonfun$1.apply$mcV$sp(MinHashLSHSuite.scala:184)
[info] at org.apache.spark.ml.feature.MinHashLSHSuite$$anonfun$1.apply(MinHashLSHSuite.scala:174)
[info] at org.apache.spark.ml.feature.MinHashLSHSuite$$anonfun$1.apply(MinHashLSHSuite.scala:174)
[info] at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
[info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info] at org.scalatest.Transformer.apply(Transformer.scala:22)
[info] at org.scalatest.Transformer.apply(Transformer.scala:20)
[info] at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
[info] at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:103)
[info] at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:183)
[info] at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
[info] at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
[info] at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
[info] at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:196)
[info] at org.apache.spark.ml.feature.MinHashLSHSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(MinHashLSHSuite.scala:28)
[info] at org.scalatest.BeforeAndAfterEach$class.runTest(BeforeAndAfterEach.scala:221)
[info] at org.apache.spark.ml.feature.MinHashLSHSuite.runTest(MinHashLSHSuite.scala:28)
[info] at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
[info] at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
[info] at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396)
[info] at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)
[info] at scala.collection.immutable.List.foreach(List.scala:381)
[info] at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
[info] at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379)
[info] at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
[info] at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:229)
[info] at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)
[info] at org.scalatest.Suite$class.run(Suite.scala:1147)
[info] at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
[info] at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
[info] at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
[info] at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
[info] at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:233)
[info] at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:52)
[info] at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:213)
[info] at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:210)
[info] at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:52)
[info] at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:314)
[info] at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:480)
[info] at sbt.ForkMain$Run$2.call(ForkMain.java:296)
[info] at sbt.ForkMain$Run$2.call(ForkMain.java:286)
[info] at java.util.concurrent.FutureTask.run(FutureTask.java:266)
[info] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
[info] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
[info] at java.lang.Thread.run(Thread.java:745)
{code}
> Structured Streaming fails with nested UDTs
> -------------------------------------------
>
> Key: SPARK-23848
> URL: https://issues.apache.org/jira/browse/SPARK-23848
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 2.3.0
> Reporter: Joseph K. Bradley
> Priority: Major
>
> While trying to write a test for org.apache.spark.ml.feature.MinHashLSHModel with Structured Streaming (for prediction in a streaming job), I ran into a bug which seems to indicate that nested UDTs don't work with streaming.
> Here's a simplified version of the code:
> {code}
> package org.apache.spark.ml.feature
> import org.apache.spark.ml.linalg.{Vector, Vectors}
> import org.apache.spark.sql.{DataFrame, Dataset, Encoder, Row}
> import org.apache.spark.sql.execution.streaming.MemoryStream
> import org.apache.spark.sql.streaming.StreamTest
> class MinHashLSHSuite extends StreamTest {
> @transient var dataset: Dataset[_] = _
> override def beforeAll(): Unit = {
> super.beforeAll()
> val data = {
> for (i <- 0 to 95) yield Vectors.sparse(100, (i until i + 5).map((_, 1.0)))
> }
> dataset = spark.createDataFrame(data.map(Tuple1.apply)).toDF("keys")
> }
> test("a test") {
> val localSpark = spark
> import localSpark.implicits._
> val df = Seq[(Int, Array[Vector])](
> (1, Array(Vectors.dense(1.0, 2.0))),
> (2, Array(Vectors.dense(1.1, 2.1)))
> ).toDF("a", "b")
> df.show() // THIS SUCCEEDS
> df.collect().foreach(println) // THIS SUCCEEDS
> testTransformerOnStreamData[(Int, Array[Vector])](df) { rows => // THIS FAILS
> rows.foreach {
> case Row(a: Int, b: Array[_]) =>
> }
> }
> }
> def testTransformerOnStreamData[A : Encoder](
> dataframe: DataFrame)
> (globalCheckFunction: Seq[Row] => Unit): Unit = {
> val stream = MemoryStream[A]
> val streamDF = stream.toDS().toDF("a", "b")
> val data = dataframe.as[A].collect()
> val streamOutput = streamDF
> .select("a", "b")
> testStream(streamOutput) (
> AddData(stream, data: _*),
> CheckAnswer(globalCheckFunction)
> )
> }
> }
> {code}
> The streaming test fails with stack trace:
> {code}
> [info] - a test *** FAILED *** (2 seconds, 325 milliseconds)
> [info] scala.MatchError: [1,WrappedArray([1.0,2.0])] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
> [info]
> [info] == Progress ==
> [info] AddData to MemoryStream[_1#24,_2#25]: (1,[Lorg.apache.spark.ml.linalg.Vector;@5abf84a9),(2,[Lorg.apache.spark.ml.linalg.Vector;@4b4198ba)
> [info] => CheckAnswerByFunc
> [info]
> [info] == Stream ==
> [info] Output Mode: Append
> [info] Stream state: {MemoryStream[_1#24,_2#25]: 0}
> [info] Thread state: alive
> [info] Thread stack trace: java.lang.Thread.sleep(Native Method)
> [info] org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:163)
> [info] org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
> [info] org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:131)
> [info] org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
> [info] org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
> [info]
> [info]
> [info] == Sink ==
> [info] 0: [1,WrappedArray([1.0,2.0])] [2,WrappedArray([1.1,2.1])]
> [info]
> [info]
> [info] == Plan ==
> [info] == Parsed Logical Plan ==
> [info] Project [a#27, b#28]
> [info] +- Project [_1#24 AS a#27, _2#25 AS b#28]
> [info] +- Project [_1#36 AS _1#24, _2#37 AS _2#25]
> [info] +- Streaming RelationV2 MemoryStreamDataSource[_1#36, _2#37]
> [info]
> [info] == Analyzed Logical Plan ==
> [info] a: int, b: array<vector>
> [info] Project [a#27, b#28]
> [info] +- Project [_1#24 AS a#27, _2#25 AS b#28]
> [info] +- Project [_1#36 AS _1#24, _2#37 AS _2#25]
> [info] +- Streaming RelationV2 MemoryStreamDataSource[_1#36, _2#37]
> [info]
> [info] == Optimized Logical Plan ==
> [info] Project [_1#36 AS a#27, _2#37 AS b#28]
> [info] +- Streaming RelationV2 MemoryStreamDataSource[_1#36, _2#37]
> [info]
> [info] == Physical Plan ==
> [info] *(1) Project [_1#36 AS a#27, _2#37 AS b#28]
> [info] +- *(1) ScanV2 MemoryStreamDataSource[_1#36, _2#37] (StreamTest.scala:430)
> [info] org.scalatest.exceptions.TestFailedException:
> [info] at org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:528)
> [info] at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1560)
> [info] at org.scalatest.Assertions$class.fail(Assertions.scala:1089)
> [info] at org.scalatest.FunSuite.fail(FunSuite.scala:1560)
> [info] at org.apache.spark.sql.streaming.StreamTest$class.failTest$1(StreamTest.scala:430)
> [info] at org.apache.spark.sql.streaming.StreamTest$class.executeAction$1(StreamTest.scala:683)
> [info] at org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1.apply(StreamTest.scala:704)
> [info] at org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1.apply(StreamTest.scala:693)
> [info] at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> [info] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> [info] at org.apache.spark.sql.streaming.StreamTest$class.liftedTree1$1(StreamTest.scala:693)
> [info] at org.apache.spark.sql.streaming.StreamTest$class.testStream(StreamTest.scala:692)
> [info] at org.apache.spark.ml.feature.MinHashLSHSuite.testStream(MinHashLSHSuite.scala:28)
> [info] at org.apache.spark.ml.feature.MinHashLSHSuite.testTransformerOnStreamData(MinHashLSHSuite.scala:201)
> [info] at org.apache.spark.ml.feature.MinHashLSHSuite$$anonfun$1.apply$mcV$sp(MinHashLSHSuite.scala:184)
> [info] at org.apache.spark.ml.feature.MinHashLSHSuite$$anonfun$1.apply(MinHashLSHSuite.scala:174)
> [info] at org.apache.spark.ml.feature.MinHashLSHSuite$$anonfun$1.apply(MinHashLSHSuite.scala:174)
> [info] at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
> [info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
> [info] at org.scalatest.Transformer.apply(Transformer.scala:22)
> [info] at org.scalatest.Transformer.apply(Transformer.scala:20)
> [info] at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
> [info] at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:103)
> [info] at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:183)
> [info] at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
> [info] at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
> [info] at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
> [info] at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:196)
> [info] at org.apache.spark.ml.feature.MinHashLSHSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(MinHashLSHSuite.scala:28)
> [info] at org.scalatest.BeforeAndAfterEach$class.runTest(BeforeAndAfterEach.scala:221)
> [info] at org.apache.spark.ml.feature.MinHashLSHSuite.runTest(MinHashLSHSuite.scala:28)
> [info] at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
> [info] at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
> [info] at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396)
> [info] at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)
> [info] at scala.collection.immutable.List.foreach(List.scala:381)
> [info] at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
> [info] at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379)
> [info] at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
> [info] at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:229)
> [info] at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)
> [info] at org.scalatest.Suite$class.run(Suite.scala:1147)
> [info] at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
> [info] at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
> [info] at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
> [info] at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
> [info] at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:233)
> [info] at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:52)
> [info] at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:213)
> [info] at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:210)
> [info] at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:52)
> [info] at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:314)
> [info] at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:480)
> [info] at sbt.ForkMain$Run$2.call(ForkMain.java:296)
> [info] at sbt.ForkMain$Run$2.call(ForkMain.java:286)
> [info] at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> [info] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> [info] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> [info] at java.lang.Thread.run(Thread.java:745)
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org