You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Mike Kaplinskiy <mi...@ladderlife.com> on 2018/10/11 20:05:02 UTC

Spark storageLevel not taking effect

Hey folks,

Admittedly I may be a bit on the bleeding edge here, but I'm attempting to
run a Beam pipeline on Spark which is running on top of Kubernetes.
Specifically Beam 2.6.0 with Spark 2.4.0-rc2 running in client mode with a
Kubernetes (1.11) driver. It's actually pretty cool - from a Kubernetes
perspective, I start a pod which starts a ton of workers to do the parallel
stuff and then cleans up after itself.

One thing I can't seem to get working is setting the storage level for
Spark RDDs via Beam. Specifically passing --storageLevel=MEMORY_AND_DISK
seems to not work - the rdd still shows up as "Memory Deserialized 1x
Replicated" in the Spark UI. I would expect it to be something closer to
"Disk Memory Deserialized 1x Replicated." It *seems* to be serialized only
in the sense that less memory is used (I assume it gets encoded).

I even tried hardcoding storageLevel in BoundedDataset.java (based on the
line number in the DAG viz). Unfortunately it still shows up as memory-only.

Am I missing something that would let me spill data to disk?

For reference, here's my exact command line:
/opt/spark/bin/spark-submit
--master 'k8s://https://kubernetes:443'
--deploy-mode client
--name $(MY_POD_NAME)
--conf spark.executor.instances=20
--conf spark.driver.host=$(MY_POD_IP)
--conf spark.driver.port=7077
--conf spark.kubernetes.container.image=$(MY_IMAGE)
--conf spark.kubernetes.driver.pod.name=$(MY_POD_NAME)
--conf spark.kubernetes.executor.podNamePrefix=$(MY_POD_NAME)
--conf spark.executor.memory=5500m
--conf spark.executor.memoryOverhead=1300m
--conf spark.memory.fraction=0.45
--conf spark.executor.cores=3
--conf spark.kubernetes.executor.limit.cores=3
--conf spark.default.parallelism=60
--conf spark.kubernetes.allocation.batch.size=20
--conf spark.kubernetes.driver.label.app=beam-datomic-smoketest
--conf spark.kubernetes.node.selector.node.ladderlife.com/group=etl
--conf spark.kubernetes.executor.annotation.iam.amazonaws.com/role=etl-role
--conf spark.kubernetes.executor.secrets.google-cloud=/google-cloud-secrets
--conf spark.kubernetes.executor.secretKeyRef.SENTRY_DSN=sentry-secrets:dsn
--conf spark.executorEnv.STATSD_HOST=169.254.168.253
--class ladder.my_beam_job
local:///srv/beam_job.jar
--runner=SparkRunner
--storageLevel=MEMORY_AND_DISK

Thanks,
Mike.

Ladder <http://bit.ly/1VRtWfS>. The smart, modern way to insure your life.

Re: Spark storageLevel not taking effect

Posted by Juan Carlos Garcia <jc...@gmail.com>.
Hi Mike,

From the documentation on
https://beam.apache.org/documentation/runners/spark/#pipeline-options-for-the-spark-runner

storageLevel The StorageLevel to use when caching RDDs in batch pipelines.
The Spark Runner automatically caches RDDs that are evaluated repeatedly.
This is a batch-only property as streaming pipelines in Beam are stateful,
which requires Spark DStream's StorageLevel to be MEMORY_ONLY. MEMORY_ONLY
So i think you are out of luck here.


On Thu, Oct 11, 2018 at 10:05 PM Mike Kaplinskiy <mi...@ladderlife.com>
wrote:

> Hey folks,
>
> Admittedly I may be a bit on the bleeding edge here, but I'm attempting to
> run a Beam pipeline on Spark which is running on top of Kubernetes.
> Specifically Beam 2.6.0 with Spark 2.4.0-rc2 running in client mode with a
> Kubernetes (1.11) driver. It's actually pretty cool - from a Kubernetes
> perspective, I start a pod which starts a ton of workers to do the parallel
> stuff and then cleans up after itself.
>
> One thing I can't seem to get working is setting the storage level for
> Spark RDDs via Beam. Specifically passing --storageLevel=MEMORY_AND_DISK
> seems to not work - the rdd still shows up as "Memory Deserialized 1x
> Replicated" in the Spark UI. I would expect it to be something closer to
> "Disk Memory Deserialized 1x Replicated." It *seems* to be serialized only
> in the sense that less memory is used (I assume it gets encoded).
>
> I even tried hardcoding storageLevel in BoundedDataset.java (based on the
> line number in the DAG viz). Unfortunately it still shows up as memory-only.
>
> Am I missing something that would let me spill data to disk?
>
> For reference, here's my exact command line:
> /opt/spark/bin/spark-submit
> --master 'k8s://https://kubernetes:443'
> --deploy-mode client
> --name $(MY_POD_NAME)
> --conf spark.executor.instances=20
> --conf spark.driver.host=$(MY_POD_IP)
> --conf spark.driver.port=7077
> --conf spark.kubernetes.container.image=$(MY_IMAGE)
> --conf spark.kubernetes.driver.pod.name=$(MY_POD_NAME)
> --conf spark.kubernetes.executor.podNamePrefix=$(MY_POD_NAME)
> --conf spark.executor.memory=5500m
> --conf spark.executor.memoryOverhead=1300m
> --conf spark.memory.fraction=0.45
> --conf spark.executor.cores=3
> --conf spark.kubernetes.executor.limit.cores=3
> --conf spark.default.parallelism=60
> --conf spark.kubernetes.allocation.batch.size=20
> --conf spark.kubernetes.driver.label.app=beam-datomic-smoketest
> --conf spark.kubernetes.node.selector.node.ladderlife.com/group=etl
> --conf
> spark.kubernetes.executor.annotation.iam.amazonaws.com/role=etl-role
> --conf spark.kubernetes.executor.secrets.google-cloud=/google-cloud-secrets
> --conf spark.kubernetes.executor.secretKeyRef.SENTRY_DSN=sentry-secrets:dsn
> --conf spark.executorEnv.STATSD_HOST=169.254.168.253
> --class ladder.my_beam_job
> local:///srv/beam_job.jar
> --runner=SparkRunner
> --storageLevel=MEMORY_AND_DISK
>
> Thanks,
> Mike.
>
> Ladder <http://bit.ly/1VRtWfS>. The smart, modern way to insure your life.
>


-- 

JC