You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by kordex <ko...@gmail.com> on 2021/03/04 14:41:22 UTC

using accumulators in (MicroBatch) InputPartitionReader

I tried to create a data source, however our use case is bit hard as
we do only know the available offsets within the tasks, not on the
driver. I therefore planned to use accumulators in the
InputPartitionReader but they seem not to work.

Example accumulation is done here
https://github.com/kortemik/spark-source/blob/master/src/main/java/com/teragrep/pth06/ArchiveMicroBatchInputPartitionReader.java#L118

I get on the task logs that the System.out.println() are called, so it
can not be that the flow itself is broken, but the accumulators seem
to work only while on the driver as on the logs at the
https://github.com/kortemik/spark-source/tree/master

Is it intentional that the accumulators do not work within the data source?

One might ask why all this so I give brief explanation. We use gzipped
files as the storage blobs and it's unknown prior to execution how
many records they contain. Of course this can be mitigated by
decompressing the files on the driver and then sending the offsets
through to executors but it's a double effort. The aim however was to
decompress them only once by doing a forward-lookup into the data and
use accumulator to inform the driver that there is stuff available for
the next batch as well or that the file is done and driver needs to
pull the next one to keep executors busy.

Any advices are welcome.

Kind regards,
-Mikko Kortelainen

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscribe@spark.apache.org


Re: using accumulators in (MicroBatch) InputPartitionReader

Posted by Jungtaek Lim <ka...@gmail.com>.
I'm not sure about the accumulator approach; one possible approach which
might work (DISCLAIMER: a random thought) would be employing an RPC
endpoint on the driver side which receives such information from executors
and plays as a coordinator.

Beware that Spark's RPC implementation is package private, so you may need
to play with some hacks (package name) and deal with changes on version
changes as Spark won't guarantee backward compatibility. If you could
employ similar things with only some lightweight dependencies making no
conflict, then I guess it would work as well.

On Thu, Mar 4, 2021 at 11:41 PM kordex <ko...@gmail.com> wrote:

> I tried to create a data source, however our use case is bit hard as
> we do only know the available offsets within the tasks, not on the
> driver. I therefore planned to use accumulators in the
> InputPartitionReader but they seem not to work.
>
> Example accumulation is done here
>
> https://github.com/kortemik/spark-source/blob/master/src/main/java/com/teragrep/pth06/ArchiveMicroBatchInputPartitionReader.java#L118
>
> I get on the task logs that the System.out.println() are called, so it
> can not be that the flow itself is broken, but the accumulators seem
> to work only while on the driver as on the logs at the
> https://github.com/kortemik/spark-source/tree/master
>
> Is it intentional that the accumulators do not work within the data source?
>
> One might ask why all this so I give brief explanation. We use gzipped
> files as the storage blobs and it's unknown prior to execution how
> many records they contain. Of course this can be mitigated by
> decompressing the files on the driver and then sending the offsets
> through to executors but it's a double effort. The aim however was to
> decompress them only once by doing a forward-lookup into the data and
> use accumulator to inform the driver that there is stuff available for
> the next batch as well or that the file is done and driver needs to
> pull the next one to keep executors busy.
>
> Any advices are welcome.
>
> Kind regards,
> -Mikko Kortelainen
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: dev-unsubscribe@spark.apache.org
>
>