You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by kineret M <ki...@gmail.com> on 2019/03/21 17:40:53 UTC

Spark streaming error - Query terminated with exception: assertion failed: Invalid batch: a#660,b#661L,c#662,d#663,,… 26 more fields != b#1291L

I try to read a stream using my custom data source (v2, using spark 2.3),
and it fails *in the second iteration* with the following exception while
reading prune columns:Query [id=xxx, runId=yyy] terminated with exception:
assertion failed: Invalid batch: a#660,b#661L,c#662,d#663,,... 26 more
fields != b#1291L

Datafream creation:

val df = sparkSession.readStream.format("myV2Source").load("/")
val df1 = df.filter(df("a") >= "-1").select("b")

Stream execution:

      val streamingQuery = df1
        .writeStream
        .format("console")
        .trigger(Trigger.ProcessingTime("10 seconds"))
        .outputMode(OutputMode.Append())
        .start()

    streamingQuery.awaitTermination()

if I remove the select (i.e. val df1 = df.filter(df("a") >= "-1")), it
works fine.

Any idea why?

Re: Spark streaming error - Query terminated with exception: assertion failed: Invalid batch: a#660,b#661L,c#662,d#663,,… 26 more fields != b#1291L

Posted by "Shixiong(Ryan) Zhu" <sh...@databricks.com>.
Could you try to use $”a” rather than df(“a”)? The latter one sometimes
doesn’t work.

On Thu, Mar 21, 2019 at 10:41 AM kineret M <ki...@gmail.com> wrote:

> I try to read a stream using my custom data source (v2, using spark 2.3),
> and it fails *in the second iteration* with the following exception while
> reading prune columns:Query [id=xxx, runId=yyy] terminated with
> exception: assertion failed: Invalid batch: a#660,b#661L,c#662,d#663,,...
> 26 more fields != b#1291L
>
> Datafream creation:
>
> val df = sparkSession.readStream.format("myV2Source").load("/")
> val df1 = df.filter(df("a") >= "-1").select("b")
>
> Stream execution:
>
>       val streamingQuery = df1
>         .writeStream
>         .format("console")
>         .trigger(Trigger.ProcessingTime("10 seconds"))
>         .outputMode(OutputMode.Append())
>         .start()
>
>     streamingQuery.awaitTermination()
>
> if I remove the select (i.e. val df1 = df.filter(df("a") >= "-1")), it
> works fine.
>
> Any idea why?
>
-- 

Best Regards,
Ryan