You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Michael Kuchnik <mk...@andrew.cmu.edu> on 2021/03/31 16:00:36 UTC

[Question] Docker and File Errors with Spark PortableRunner

Hello,
I am trying to get a Beam over Spark cluster working. As a simple test, I
am running the Python Pi estimation example (PiEstimation) [1,2] in both
Beam and Spark. The Spark example is completely working, but the Beam
example is not (when using PortableRunner). The Beam example works locally
with DirectRunner. Thus, I cannot use distributed Beam. The issue seems to
be related to: 1) the transport layer responsible for node-to-node
communication or 2) Docker bootup. It’d be helpful to have some additional
insight into the proper configuration required to allow PortableRunner to
run successfully.

[System Setup]
I am running on 2 nodes: node_0 and node_1. Node_0 controls
metadata/scheduling activity (e.g., Spark master and Beam coordination)
while node_1 is only a worker. Since I have confirmed both PiEstimation
examples work locally (e.g., DirectRunner), the only thing I am testing is
if I can compute the same solution except with the complexity of a remote
worker (node_1). Nodes are running Ubuntu 18 with Intel Xeons. The nodes
have a network filesystem for sharing data (sharing /users/mkuchnik/) and
there is a network Proxy. I am using Apache Beam 2.28.0 on an Anaconda
python3.7 install. Spark install is spark-2.4.7-bin-hadoop2.7.

[Software Setup]
I’ve tried 3 setups so far. The first 2 are permuting the SDK harness’s
environment types [4], and the third is a suggestion off the web [5]. I
start the server on node_0 using ``http_proxy="" https_proxy="" docker run
--net=host $LOCAL_ENDPOINT/mkuchnik/beam_spark_job_server
--spark-master-url=spark://10.111.4.78:7077 --job-host=10.111.4.78
–artifacts-dir=/users/mkuchnik/staging”. $LOCAL_ENDPOINT points to a local
Docker repository that is mirroring the publicly available Docker images.

1. [PortableRunner + DOCKER]. From my understanding, this should
automatically pull and orchestrate any Docker interactions. I set
environment_config to point to the local repository’s mirror of the Docker
image ($LOCAL_ENDPOINT/mkuchnik/beam_python3.7_sdk:2.28.0). The logs
indicate both: a) filesystem error and b) Docker error.
a) Failed to retrieve staged files: failed to retrieve /tmp/staged in 3
attempts: failed to retrieve chunk for /tmp/staged/pickled_main_session
b)
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:
java.lang.IllegalStateException: No container running for id
642a9648daab65dda2c12806d7bb6da976d0b8c269cf6438be5e34b6551d36c3

2. [PortableRunner + EXTERNAL] This should be the same as #1, but instead I
must start a daemon on any node that uses Docker. I start the SDK on the
two nodes without any filesystem sharing (e.g., using “-v”): ``pdsh -w
h-[0-1] docker run --net=host -d --rm
$LOCAL_ENDPOINT/mkuchnik/beam_python3.7_sdk:2.28.0 –worker_pool”, which
successfully starts the containers. The submitted job blocks forever
waiting to connect to the container. Again, there is a case of failed
file-sharing java.io.FileNotFoundException:
/users/mkuchnik/staging/b9705abc52e03bf2720205478fdb59132b53c528d4a0ec85cfcd6f15d9ea9f83/1-ref_Environment_default_e-pickled_main_session
(No such file or directory).
Adding a filesystem mount (with “-v /users/mkuchnik:/users/mkuchnik”) to
the containers doesn’t seem to work.

3. [SparkRunner + Uber Jar] This approach doesn’t seem to be documented,
but I can reproduce what I see in the StackOverflow thread [5]. As stated
in that thread, the JAR file is “shared” through /tmp, which is a local
filesystem. I didn’t pursue this approach further as the original thread
never reached a resolution even after sharing /tmp.

[Preliminary Conclusion]
The three cases above indicate that there is a file-sharing exchange that
must occur for the worker to start. These files may have issues being
shared across nodes, as well as between the OS and Docker. For the first
two approaches, I do see a Java process appear on node-1 that uses about 1%
of CPU, so, to me, it seems that this points to a failed rendezvous with
the Docker container started by a Java daemon on node_1.

[Miscellaneous]
I use http_proxy to access the web, but I disable it for running the
experiments, since gRPC will automatically use these environment variables
[6], causing localhost resolution to fail. For related reasons, I use the
local Docker repository rather than Docker Hub.

[Attached Files]
I've attached the stderr/stdout given by launching the respective commands
in *_run.txt. I also am attaching the worker stderr logs for the first two
(*_worker_log.txt). The estimate_pi.py code can be grabbed from [2]---to
make it consistent with the Spark example, I am writing the estimate to
stdout rather than a file ( e.g., ``| "print" >> beam.Map(print))'').

[1]
https://github.com/apache/spark/blob/master/examples/src/main/python/pi.py
[2]
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/complete/estimate_pi.py
[3] https://beam.apache.org/documentation/runners/spark/
[4] https://beam.apache.org/documentation/runtime/sdk-harness-config/
[5]
https://stackoverflow.com/questions/66320831/its-possible-to-configure-the-beam-portable-runner-with-the-spark-configuration

Thank you,
Michael

Re: [Question] Docker and File Errors with Spark PortableRunner

Posted by Kyle Weaver <kc...@google.com>.
Hi Michael,

Your problems in 1. and 2. are related to the artifact staging workflow,
where Beam tries to copy your pipeline’s dependencies to the workers. When
artifacts cannot be fetched because of file system or other issues, the
workers cannot be started successfully. In this case, your pipeline depends
on the pickled main session from estimate_pi.py [1].

In order for artifact staging to work, the job server’s --artifacts-dir
must be accessible by the Spark worker.* Since you start your job server in
a Docker container, /users/mkuchnik/staging is hidden inside that Docker
container’s filesystem, which is not accessible from your network
filesystem. You mentioned in 2. that you tried mounting the directory to
the [worker (?)] containers, but have you tried mounting that directory to
the job server container?

Thanks,
Kyle

* It looks like this is unclear in current documentation, so I will edit it.

[1]
https://github.com/apache/beam/blob/2c619c81082839e054f16efee9311b9f74b6e436/sdks/python/apache_beam/examples/complete/estimate_pi.py#L118