You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by kineret M <ki...@gmail.com> on 2019/03/21 17:40:53 UTC
Spark streaming error - Query terminated with exception: assertion failed: Invalid batch: a#660,b#661L,c#662,d#663,,… 26 more fields != b#1291L
I try to read a stream using my custom data source (v2, using spark 2.3),
and it fails *in the second iteration* with the following exception while
reading prune columns:Query [id=xxx, runId=yyy] terminated with exception:
assertion failed: Invalid batch: a#660,b#661L,c#662,d#663,,... 26 more
fields != b#1291L
Datafream creation:
val df = sparkSession.readStream.format("myV2Source").load("/")
val df1 = df.filter(df("a") >= "-1").select("b")
Stream execution:
val streamingQuery = df1
.writeStream
.format("console")
.trigger(Trigger.ProcessingTime("10 seconds"))
.outputMode(OutputMode.Append())
.start()
streamingQuery.awaitTermination()
if I remove the select (i.e. val df1 = df.filter(df("a") >= "-1")), it
works fine.
Any idea why?
Re: Spark streaming error - Query terminated with exception: assertion failed: Invalid batch: a#660,b#661L,c#662,d#663,,… 26 more fields != b#1291L
Posted by "Shixiong(Ryan) Zhu" <sh...@databricks.com>.
Could you try to use $”a” rather than df(“a”)? The latter one sometimes
doesn’t work.
On Thu, Mar 21, 2019 at 10:41 AM kineret M <ki...@gmail.com> wrote:
> I try to read a stream using my custom data source (v2, using spark 2.3),
> and it fails *in the second iteration* with the following exception while
> reading prune columns:Query [id=xxx, runId=yyy] terminated with
> exception: assertion failed: Invalid batch: a#660,b#661L,c#662,d#663,,...
> 26 more fields != b#1291L
>
> Datafream creation:
>
> val df = sparkSession.readStream.format("myV2Source").load("/")
> val df1 = df.filter(df("a") >= "-1").select("b")
>
> Stream execution:
>
> val streamingQuery = df1
> .writeStream
> .format("console")
> .trigger(Trigger.ProcessingTime("10 seconds"))
> .outputMode(OutputMode.Append())
> .start()
>
> streamingQuery.awaitTermination()
>
> if I remove the select (i.e. val df1 = df.filter(df("a") >= "-1")), it
> works fine.
>
> Any idea why?
>
--
Best Regards,
Ryan