You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "jeremyje (via GitHub)" <gi...@apache.org> on 2023/04/29 16:14:35 UTC

[GitHub] [beam] jeremyje opened a new issue, #26485: [Bug]: Difficult to use Portable Flink Runner with Go

jeremyje opened a new issue, #26485:
URL: https://github.com/apache/beam/issues/26485

   ### What happened?
   
   Request to update the instructions on how to run Apache Beam portable runner on Flink and consider Go. Also test that the instructions work on newer versions of Flink (1.17 or 1.16) with updated runner docker images.
   
   I followed instructions on running Apache Flink with Apache Beam 2.46.0 Go SDK. I'm hitting all sorts of issues and have not gotten my pipelines to work based on the instructions given at:
   
   * https://beam.apache.org/documentation/runners/flink/
   * https://beam.apache.org/documentation/runtime/sdk-harness-config/
   
   At best, I am able to bring up a Flink instance with a pipeline that processes a limited amount of data ~200 MB and then seizes without any feedback. This pipeline runs in `environment_type=LOOPBACK` mode. I'm starting to think there's some weird compatibility issue as I'd really prefer to run in the default `-environment_type=DOCKER` mode. It's not clear if this mode supported given the example goes up to Flink 1.14 which is no longer listed on the Flink site. 
   
   **It'd be nice to get updated documentation for running Beam on a Flink cluster and have it target a newer version, ~Flink 1.17.**
   
   These are the steps I've taken:
   ```bash
   flink-1.15.4$ ./bin/start-cluster.sh 
   Starting cluster.
   Starting standalonesession daemon on host coder.
   Starting taskexecutor daemon on host coder.
   
   $ docker run --net=host apache/beam_flink1.15_job_server:2.46.0 --flink-master=coder:8081 --job-host=coder --artifacts-dir=/opt/artifacts
   
   ```
   
   Run the Pipeline with the following parameters:
   ```bash
   go run mypipeline.go -runner flink -endpoint=coder:8099 -environment_type=LOOPBACK
   
   root_transform_ids: "s1"
   2023/04/29 07:40:03 Cross-compiling /home/coder/project/mypipeline.go as /tmp/worker-1-1682754003926484499
   2023/04/29 07:40:05 Prepared job with id: go-job-1-1682754003926481819_79b82cf7-1e81-4c5a-a49d-253712a5c422 and staging token: go-job-1-1682754003926481819_79b82cf7-1e81-4c5a-a49d-253712a5c422
   2023/04/29 07:40:05 Staged binary artifact with token: 
   2023/04/29 07:40:05 Submitted job: go0job0101682754003926481819-root-0429074005-d5669737_2f0246ce-150e-4e32-a234-e2f03f9c9222
   2023/04/29 07:40:05 Job state: STOPPED
   2023/04/29 07:40:05 Job state: STARTING
   2023/04/29 07:40:05 Job state: RUNNING
   2023/04/29 07:40:16 starting worker 1-1
   ```
   
   Only `-environment_type=LOOPBACK` has worked for me to submit jobs but it still fails after processing about 100MiB of data which the memory limits are well above that (see the `flink-conf.yaml` below shows `taskmanager.memory.process.size: 20g`)
   With trying the various ways to run `job-server` the most I get is a successful job submit in
   
   
   I've attempted other ways to workaround the various errors I've been seeing such as no docker available in the `apache/beam_flink1.15_job_server:2.46.0` container.
   
   Versions
   ```
   $ uname -a
   Linux coder 5.15.0-71-generic #78-Ubuntu SMP Tue Apr 18 09:00:29 UTC 2023 x86_64 x86_64 x86_64 GNU/Linux
   
   $ docker version
   Client: Docker Engine - Community
    Version:           23.0.5
    API version:       1.42
    Go version:        go1.19.8
    Git commit:        bc4487a
    Built:             Wed Apr 26 16:21:07 2023
    OS/Arch:           linux/amd64
    Context:           default
   
   Server: Docker Engine - Community
    Engine:
     Version:          23.0.5
     API version:      1.42 (minimum version 1.12)
     Go version:       go1.19.8
     Git commit:       94d3ad6
     Built:            Wed Apr 26 16:21:07 2023
     OS/Arch:          linux/amd64
     Experimental:     false
    containerd:
     Version:          1.6.20
     GitCommit:        2806fc1057397dbaeefbea0e4e17bddfbd388f38
    runc:
     Version:          1.1.5
     GitCommit:        v1.1.5-0-gf19387a
    docker-init:
     Version:          0.19.0
     GitCommit:        de40ad0
   
   $ go version
   go version go1.20.2 linux/amd64
   
   
   ```
   
   Attempt with `docker-compose` on the job server:
   
   ```yaml
   version: '3.9'
   services:
     jobserver:
       hostname: jobserver
       # image: apache/beam_flink1.15_job_server:2.46.0
       image: jeremyje/beam_flink1.15_job_server:2.46.0
       build:
         context: .
         dockerfile: Dockerfile
       ports:
         - "8099:8099"
         - "8098:8098"
         - "8097:8097"
       command: ["--flink-master=localhost:8081", "--job-host=coder", "--artifacts-dir", "/opt/artifacts"]
       volumes:
         - /var/run/docker.sock:/var/run/docker.sock
       network_mode: host
   ```
   The docker image for `jeremyje/beam_flink1.15_job_server:2.46.0` image when trying to get docker mode to work.
   
   ```Dockerfile
   # Creates an image that also has docker installed in hopes that it works with the -environment_type=DOCKER mode.
   FROM apache/beam_flink1.15_job_server:2.46.0
   
   RUN curl -fsSL https://get.docker.com -o get-docker.sh
   RUN sh ./get-docker.sh
   ```
   
   
   My `conf/flink-config.yaml` which mainly exposes all the ports outside of the machine so I can submit jobs from another machine.
   
   ```yaml
   ################################################################################
   #  Licensed to the Apache Software Foundation (ASF) under one
   #  or more contributor license agreements.  See the NOTICE file
   #  distributed with this work for additional information
   #  regarding copyright ownership.  The ASF licenses this file
   #  to you under the Apache License, Version 2.0 (the
   #  "License"); you may not use this file except in compliance
   #  with the License.  You may obtain a copy of the License at
   #
   #      http://www.apache.org/licenses/LICENSE-2.0
   #
   #  Unless required by applicable law or agreed to in writing, software
   #  distributed under the License is distributed on an "AS IS" BASIS,
   #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   #  See the License for the specific language governing permissions and
   # limitations under the License.
   ################################################################################
   
   
   #==============================================================================
   # Common
   #==============================================================================
   
   # The external address of the host on which the JobManager runs and can be
   # reached by the TaskManagers and any clients which want to connect. This setting
   # is only used in Standalone mode and may be overwritten on the JobManager side
   # by specifying the --host <hostname> parameter of the bin/jobmanager.sh executable.
   # In high availability mode, if you use the bin/start-cluster.sh script and setup
   # the conf/masters file, this will be taken care of automatically. Yarn
   # automatically configure the host name based on the hostname of the node where the
   # JobManager runs.
   
   jobmanager.rpc.address: localhost
   
   # The RPC port where the JobManager is reachable.
   
   jobmanager.rpc.port: 6123
   
   # The host interface the JobManager will bind to. My default, this is localhost, and will prevent
   # the JobManager from communicating outside the machine/container it is running on.
   # On YARN this setting will be ignored if it is set to 'localhost', defaulting to 0.0.0.0.
   # On Kubernetes this setting will be ignored, defaulting to 0.0.0.0.
   #
   # To enable this, set the bind-host address to one that has access to an outside facing network
   # interface, such as 0.0.0.0.
   
   jobmanager.bind-host: 0.0.0.0
   
   
   # The total process memory size for the JobManager.
   #
   # Note this accounts for all memory usage within the JobManager process, including JVM metaspace and other overhead.
   
   jobmanager.memory.process.size: 1600m
   
   # The host interface the TaskManager will bind to. By default, this is localhost, and will prevent
   # the TaskManager from communicating outside the machine/container it is running on.
   # On YARN this setting will be ignored if it is set to 'localhost', defaulting to 0.0.0.0.
   # On Kubernetes this setting will be ignored, defaulting to 0.0.0.0.
   #
   # To enable this, set the bind-host address to one that has access to an outside facing network
   # interface, such as 0.0.0.0.
   
   taskmanager.bind-host: 0.0.0.0
   
   # The address of the host on which the TaskManager runs and can be reached by the JobManager and
   # other TaskManagers. If not specified, the TaskManager will try different strategies to identify
   # the address.
   #
   # Note this address needs to be reachable by the JobManager and forward traffic to one of
   # the interfaces the TaskManager is bound to (see 'taskmanager.bind-host').
   #
   # Note also that unless all TaskManagers are running on the same machine, this address needs to be
   # configured separately for each TaskManager.
   
   taskmanager.host: localhost
   
   # The total process memory size for the TaskManager.
   #
   # Note this accounts for all memory usage within the TaskManager process, including JVM metaspace and other overhead.
   
   taskmanager.memory.process.size: 20g
   
   # To exclude JVM metaspace and overhead, please, use total Flink memory size instead of 'taskmanager.memory.process.size'.
   # It is not recommended to set both 'taskmanager.memory.process.size' and Flink memory.
   #
   # taskmanager.memory.flink.size: 1280m
   
   # The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
   
   taskmanager.numberOfTaskSlots: 1
   
   # The parallelism used for programs that did not specify and other parallelism.
   
   parallelism.default: 10
   
   rest.flamegraph.enabled: true
   
   # The default file system scheme and authority.
   # 
   # By default file paths without scheme are interpreted relative to the local
   # root file system 'file:///'. Use this to override the default and interpret
   # relative paths relative to a different file system,
   # for example 'hdfs://mynamenode:12345'
   #
   # fs.default-scheme
   
   #==============================================================================
   # High Availability
   #==============================================================================
   
   # The high-availability mode. Possible options are 'NONE' or 'zookeeper'.
   #
   # high-availability: zookeeper
   
   # The path where metadata for master recovery is persisted. While ZooKeeper stores
   # the small ground truth for checkpoint and leader election, this location stores
   # the larger objects, like persisted dataflow graphs.
   # 
   # Must be a durable file system that is accessible from all nodes
   # (like HDFS, S3, Ceph, nfs, ...) 
   #
   # high-availability.storageDir: hdfs:///flink/ha/
   
   # The list of ZooKeeper quorum peers that coordinate the high-availability
   # setup. This must be a list of the form:
   # "host1:clientPort,host2:clientPort,..." (default clientPort: 2181)
   #
   # high-availability.zookeeper.quorum: localhost:2181
   
   
   # ACL options are based on https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes
   # It can be either "creator" (ZOO_CREATE_ALL_ACL) or "open" (ZOO_OPEN_ACL_UNSAFE)
   # The default value is "open" and it can be changed to "creator" if ZK security is enabled
   #
   # high-availability.zookeeper.client.acl: open
   
   #==============================================================================
   # Fault tolerance and checkpointing
   #==============================================================================
   
   # The backend that will be used to store operator state checkpoints if
   # checkpointing is enabled. Checkpointing is enabled when execution.checkpointing.interval > 0.
   #
   # Execution checkpointing related parameters. Please refer to CheckpointConfig and ExecutionCheckpointingOptions for more details.
   #
   # execution.checkpointing.interval: 3min
   # execution.checkpointing.externalized-checkpoint-retention: [DELETE_ON_CANCELLATION, RETAIN_ON_CANCELLATION]
   # execution.checkpointing.max-concurrent-checkpoints: 1
   # execution.checkpointing.min-pause: 0
   # execution.checkpointing.mode: [EXACTLY_ONCE, AT_LEAST_ONCE]
   # execution.checkpointing.timeout: 10min
   # execution.checkpointing.tolerable-failed-checkpoints: 0
   # execution.checkpointing.unaligned: false
   #
   # Supported backends are 'hashmap', 'rocksdb', or the
   # <class-name-of-factory>.
   #
   # state.backend: hashmap
   
   # Directory for checkpoints filesystem, when using any of the default bundled
   # state backends.
   #
   # state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
   
   # Default target directory for savepoints, optional.
   #
   # state.savepoints.dir: hdfs://namenode-host:port/flink-savepoints
   
   # Flag to enable/disable incremental checkpoints for backends that
   # support incremental checkpoints (like the RocksDB state backend). 
   #
   # state.backend.incremental: false
   
   # The failover strategy, i.e., how the job computation recovers from task failures.
   # Only restart tasks that may have been affected by the task failure, which typically includes
   # downstream tasks and potentially upstream tasks if their produced data is no longer available for consumption.
   
   jobmanager.execution.failover-strategy: region
   
   #==============================================================================
   # Rest & web frontend
   #==============================================================================
   
   # The port to which the REST client connects to. If rest.bind-port has
   # not been specified, then the server will bind to this port as well.
   #
   #rest.port: 8081
   
   # The address to which the REST client will connect to
   #
   rest.address: localhost
   
   # Port range for the REST and web server to bind to.
   #
   #rest.bind-port: 8080-8090
   
   # The address that the REST & web server binds to
   # By default, this is localhost, which prevents the REST & web server from
   # being able to communicate outside of the machine/container it is running on.
   #
   # To enable this, set the bind address to one that has access to outside-facing
   # network interface, such as 0.0.0.0.
   #
   rest.bind-address: 0.0.0.0
   
   # Flag to specify whether job submission is enabled from the web-based
   # runtime monitor. Uncomment to disable.
   
   #web.submit.enable: false
   
   # Flag to specify whether job cancellation is enabled from the web-based
   # runtime monitor. Uncomment to disable.
   
   #web.cancel.enable: false
   
   #==============================================================================
   # Advanced
   #==============================================================================
   
   # Override the directories for temporary files. If not specified, the
   # system-specific Java temporary directory (java.io.tmpdir property) is taken.
   #
   # For framework setups on Yarn, Flink will automatically pick up the
   # containers' temp directories without any need for configuration.
   #
   # Add a delimited list for multiple directories, using the system directory
   # delimiter (colon ':' on unix) or a comma, e.g.:
   #     /data1/tmp:/data2/tmp:/data3/tmp
   #
   # Note: Each directory entry is read from and written to by a different I/O
   # thread. You can include the same directory multiple times in order to create
   # multiple I/O threads against that directory. This is for example relevant for
   # high-throughput RAIDs.
   #
   # io.tmp.dirs: /tmp
   
   # The classloading resolve order. Possible values are 'child-first' (Flink's default)
   # and 'parent-first' (Java's default).
   #
   # Child first classloading allows users to use different dependency/library
   # versions in their application than those in the classpath. Switching back
   # to 'parent-first' may help with debugging dependency issues.
   #
   # classloader.resolve-order: child-first
   
   # The amount of memory going to the network stack. These numbers usually need 
   # no tuning. Adjusting them may be necessary in case of an "Insufficient number
   # of network buffers" error. The default min is 64MB, the default max is 1GB.
   # 
   # taskmanager.memory.network.fraction: 0.1
   # taskmanager.memory.network.min: 64mb
   # taskmanager.memory.network.max: 1gb
   
   #==============================================================================
   # Flink Cluster Security Configuration
   #==============================================================================
   
   # Kerberos authentication for various components - Hadoop, ZooKeeper, and connectors -
   # may be enabled in four steps:
   # 1. configure the local krb5.conf file
   # 2. provide Kerberos credentials (either a keytab or a ticket cache w/ kinit)
   # 3. make the credentials available to various JAAS login contexts
   # 4. configure the connector to use JAAS/SASL
   
   # The below configure how Kerberos credentials are provided. A keytab will be used instead of
   # a ticket cache if the keytab path and principal are set.
   
   # security.kerberos.login.use-ticket-cache: true
   # security.kerberos.login.keytab: /path/to/kerberos/keytab
   # security.kerberos.login.principal: flink-user
   
   # The configuration below defines which JAAS login contexts
   
   # security.kerberos.login.contexts: Client,KafkaClient
   
   #==============================================================================
   # ZK Security Configuration
   #==============================================================================
   
   # Below configurations are applicable if ZK ensemble is configured for security
   
   # Override below configuration to provide custom ZK service name if configured
   # zookeeper.sasl.service-name: zookeeper
   
   # The configuration below must match one of the values set in "security.kerberos.login.contexts"
   # zookeeper.sasl.login-context-name: Client
   
   #==============================================================================
   # HistoryServer
   #==============================================================================
   
   # The HistoryServer is started and stopped via bin/historyserver.sh (start|stop)
   
   # Directory to upload completed jobs to. Add this directory to the list of
   # monitored directories of the HistoryServer as well (see below).
   #jobmanager.archive.fs.dir: hdfs:///completed-jobs/
   
   # The address under which the web-based HistoryServer listens.
   #historyserver.web.address: 0.0.0.0
   
   # The port under which the web-based HistoryServer listens.
   #historyserver.web.port: 8082
   
   # Comma separated list of directories to monitor for completed jobs.
   #historyserver.archive.fs.dir: hdfs:///completed-jobs/
   
   # Interval in milliseconds for refreshing the monitored directories.
   #historyserver.archive.fs.refresh-interval: 10000
   ```
   
   Logs from the jobserver.
   ```bash
   Apr 29, 2023 7:57:41 AM org.apache.beam.runners.jobsubmission.JobServerDriver createArtifactStagingService
   INFO: ArtifactStagingService started on coder:8098
   Apr 29, 2023 7:57:41 AM org.apache.beam.runners.jobsubmission.JobServerDriver createExpansionService
   INFO: Java ExpansionService started on coder:8097
   Apr 29, 2023 7:57:41 AM org.apache.beam.runners.jobsubmission.JobServerDriver createJobServer
   INFO: JobService started on coder:8099
   Apr 29, 2023 7:57:41 AM org.apache.beam.runners.jobsubmission.JobServerDriver run
   INFO: Job server now running, terminate with Ctrl+C
   Apr 29, 2023 7:58:01 AM org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService$2 onNext
   INFO: Staging artifacts for go-job-1-1682755079768243510_745fdccd-63e2-4758-a101-cafcd659efe5.
   Apr 29, 2023 7:58:01 AM org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService$2 resolveNextEnvironment
   INFO: Resolving artifacts for go-job-1-1682755079768243510_745fdccd-63e2-4758-a101-cafcd659efe5.go.
   Apr 29, 2023 7:58:01 AM org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService$2 onNext
   INFO: Getting 1 artifacts for go-job-1-1682755079768243510_745fdccd-63e2-4758-a101-cafcd659efe5.null.
   Apr 29, 2023 7:58:01 AM org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService$2 finishStaging
   INFO: Artifacts fully staged for go-job-1-1682755079768243510_745fdccd-63e2-4758-a101-cafcd659efe5.
   Apr 29, 2023 7:58:02 AM org.apache.beam.runners.flink.FlinkJobInvoker invokeWithExecutor
   INFO: Invoking job go0job0101682755079768243510-root-0429075801-3c2bd8bd_99e87770-cacd-452f-a653-586d72d5adc7 with pipeline runner org.apache.beam.runners.flink.FlinkPipelineRunner@3294e616
   Apr 29, 2023 7:58:02 AM org.apache.beam.runners.jobsubmission.JobInvocation start
   INFO: Starting job invocation go0job0101682755079768243510-root-0429075801-3c2bd8bd_99e87770-cacd-452f-a653-586d72d5adc7
   Apr 29, 2023 7:58:02 AM org.apache.beam.runners.flink.FlinkPipelineRunner runPipelineWithTranslator
   INFO: Translating pipeline to Flink program.
   Apr 29, 2023 7:58:02 AM org.apache.beam.runners.flink.FlinkExecutionEnvironments createBatchExecutionEnvironment
   INFO: Creating a Batch Execution Environment.
   Apr 29, 2023 7:58:02 AM org.apache.beam.runners.flink.FlinkExecutionEnvironments createBatchExecutionEnvironment
   INFO: Using Flink Master URL localhost:8081.
   Apr 29, 2023 7:58:09 AM org.apache.flink.api.java.utils.PlanGenerator logTypeRegistrationDetails
   INFO: The job has 0 registered types and 0 default Kryo serializers
   Apr 29, 2023 7:58:09 AM org.apache.flink.client.program.rest.RestClusterClient lambda$submitJob$7
   INFO: Submitting job 'go0job0101682755079768243510-root-0429075801-3c2bd8bd' (bf1c584a8970302886d35baf0a17fb72).
   Apr 29, 2023 7:58:12 AM org.apache.flink.client.program.rest.RestClusterClient lambda$null$6
   INFO: Successfully submitted job 'go0job0101682755079768243510-root-0429075801-3c2bd8bd' (bf1c584a8970302886d35baf0a17fb72) to 'http://localhost:8081'.
   ```
   
   ### Issue Priority
   
   Priority: 3 (minor)
   
   ### Issue Components
   
   - [ ] Component: Python SDK
   - [ ] Component: Java SDK
   - [X] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Spark Runner
   - [X] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] [Bug]: Difficult to use Portable Flink Runner with Go [beam]

Posted by "esequielvirtuoso (via GitHub)" <gi...@apache.org>.
esequielvirtuoso commented on issue #26485:
URL: https://github.com/apache/beam/issues/26485#issuecomment-1856231403

   > Here's a tip for anyone who stumbles upon similar issues with spark portable runner.
   > 
   > The official document says you should use `apache/beam_spark_job_server` docker image with spark `3.2.x` but that didn't work for me. I used `apache/beam_spark3_job_server` with spark `3.1.2` instead.
   
   @LibofRelax thanks a lot for this answer. I spent 2 weeks trying distinct combinations of Spark and Beam versions without success.
   
   With your tip, it worked!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] LibofRelax commented on issue #26485: [Bug]: Difficult to use Portable Flink Runner with Go

Posted by "LibofRelax (via GitHub)" <gi...@apache.org>.
LibofRelax commented on issue #26485:
URL: https://github.com/apache/beam/issues/26485#issuecomment-1675640987

   I totally agree that we need better documentation of portable runners in general. I've been digging through python and go SDK code for days trying to get portable runners to work and OMG the official documentation was bad.
   
   It seems like you misunderstood LOOPBACK environment. Loopback starts a server on your host machine (not the flink node) and is solely for debugging locally. The beam generated code running on flink will connect to your go code on your machine to run UDFs, so it's bounded by local memory.
   
   As for DOCKER environment, the beam code will try to run the apache/beam_go_SDK image on flink node so you will need to be able to run docker on that machine. To run docker in docker containers, you need to set `privileged: true` in your docker-compose config. Although it would be nice to run the SDK container as a side car using the EXTERNAL environment option in containerized environments such as k8s, it looks like the go SDK has not provided worker pool in SDK harness container yet unlike python and java SDKs.
   
   Another important thing to note, you need to mount the job server artifact directory to SDK harness containers as they expect to find artifacts in the same directory.
   
   I'm not a beam expert though, please take these with a grain of salt.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] [Bug]: Difficult to use Portable Flink Runner with Go [beam]

Posted by "atombender (via GitHub)" <gi...@apache.org>.
atombender commented on issue #26485:
URL: https://github.com/apache/beam/issues/26485#issuecomment-1987173270

   Has there been any work on this? I'm struggling to understand how to make the Beam work through the Flink task manager on Kubernetes. Obviously, with Kubernetes, `DOCKER` is not an option, only `EXTERNAL`.
   
   With Python, it looks like you run `apache/beam_python_sdk --worker_pool` as a sidecar inside the Flink task manager pod. But the Beam boot command in `apache/beam_go_sdk` doesn't have this flag. It's also unclear what you're supposed to pass as arguments (like `--id`) to this command.
   
   There are zero examples on doing this with Go. Even after several years of apparent Beam + Go support, nobody seems to have posted any code examples, and there's just a tiny handful of StackOverflow posts that don't have any solutions.
   
   >  it looks like the go SDK has not provided worker pool in SDK harness container yet unlike python and java SDKs.
   
   It this a prerequisite, then? Are the plans to work on it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] LibofRelax commented on issue #26485: [Bug]: Difficult to use Portable Flink Runner with Go

Posted by "LibofRelax (via GitHub)" <gi...@apache.org>.
LibofRelax commented on issue #26485:
URL: https://github.com/apache/beam/issues/26485#issuecomment-1675643530

   Here's a tip for anyone who stumbles upon similar issues with spark portable runner. 
   
   The official document says you should use `apache/beam_spark_job_server` with spark `3.2.x` but that didn't work for me. I used `apache/beam_spark3_job_server` with spark `3.1.2` instead.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] LibofRelax commented on issue #26485: [Bug]: Difficult to use Portable Flink Runner with Go

Posted by "LibofRelax (via GitHub)" <gi...@apache.org>.
LibofRelax commented on issue #26485:
URL: https://github.com/apache/beam/issues/26485#issuecomment-1700788584

   @jeremyje  It's been a while since I gave up on using Beam in production, so this sample might not be fully correct.
   
   Here's a sample docker compose config that kinda worked for me. It runs smoothly until UDF execution. There seems to be a version mismatch between the job server and the worker implementation despite they have the same version tag. The worker will expect a log endpoint from the job on spark worker which the spark worker does not seem to expose. You can try it out with `--environment EXTERNAL --environment_config beam-python-workers:50000`
   
   I suggest you try the docker-in-docker environment option. Maybe the default docker image is consistent with the job implementation. I already set the privileged flag to true in the compose file for that. If using `--environment DOCKER`, `beam-python-workers` service won't be needed.
   
   ```yml
   version: '3'
   
   volumes:
     tmp:
   
   services:
   
     spark:
       image: docker.io/bitnami/spark:3.1.2
       environment:
         - SPARK_MODE=master
       ports:
         - "8080:8080"
   
     spark-worker:
       image: docker.io/bitnami/spark:3.1.2
       privileged: true # To run docker SDK harness
       environment:
         - SPARK_MODE=worker
         - SPARK_MASTER_URL=spark://spark:7077
         - SPARK_WORKER_MEMORY=4g
         - SPARK_WORKER_CORES=1
         - BEAM_WORKER_POOL_IN_DOCKER_VM=1
         - DOCKER_MAC_CONTAINER=1
       ports:
         - "8081:8081"
         - "8100-8200:8100-8200"
       volumes:
         - tmp:/tmp
         - ./work/spark:/opt/bitnami/spark/work
   
     beam-python-workers:
       image: apache/beam_python3.10_sdk:2.49.0
       command: [ "--worker_pool" ]
       environment:
         - RUN_PYTHON_SDK_IN_DEFAULT_ENVIRONMENT=1
       volumes:
         - tmp:/tmp
   
     beam-job-server:
       image: apache/beam_spark3_job_server:2.49.0
       command: [ "--spark-master-url=spark://spark:7077" ]
       ports:
         - "4040:4040" # Spark job UI on the driver
         - "8099:8099" # Job endpoint
         - "8098:8098" # Artifact endpoint
       volumes:
         - tmp:/tmp
       depends_on:
         - spark
         - spark-worker
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] jeremyje commented on issue #26485: [Bug]: Difficult to use Portable Flink Runner with Go

Posted by "jeremyje (via GitHub)" <gi...@apache.org>.
jeremyje commented on issue #26485:
URL: https://github.com/apache/beam/issues/26485#issuecomment-1695853483

   @LibofRelax Do you have any sample configs or basic setup instructions beyond what was said before that would be useful? In particular how are you running the runner image and what version?
   
   Also I cannot find `3.1.2` of https://hub.docker.com/r/apache/beam_spark3_job_server/tags. The versions appear to be modeled after the beam version like `2.49.0`. Also same for, https://hub.docker.com/r/apache/beam_spark_job_server/tags.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org