You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Kenneth Knowles <ke...@apache.org> on 2023/08/14 17:57:05 UTC

Re: Seeking Assistance to Resolve Issues/bug with Flink Runner on Kubernetes

There is a slack channel linked from
https://beam.apache.org/community/contact-us/ it is #beam on
the-asf.slack.com

(you find this via beam.apache.org -> Community -> Contact Us)

It sounds like an issue with running a multi-language pipeline on the
portable flink runner. (something which I am not equipped to help with in
detail)

Kenn

On Wed, Aug 9, 2023 at 2:51 PM kapil singh <ka...@gmail.com> wrote:

> Hey,
>
> I've been grappling with this issue for the past five days and, despite my
> continuous efforts, I haven't found a resolution. Additionally, I've been
> unable to locate a Slack channel for Beam where I might seek assistance.
>
> issue
>
> *RuntimeError: Pipeline construction environment and pipeline runtime
> environment are not compatible. If you use a custom container image, check
> that the Python interpreter minor version and the Apache Beam version in
> your image match the versions used at pipeline construction time.
> Submission environment: beam:version:sdk_base:apache/beam_java8_sdk:2.48.0.
> Runtime environment:
> beam:version:sdk_base:apache/beam_python3.8_sdk:2.48.0.*
>
>
> Here what i am trying to do
>
>  i am running job from kubernetes container  that hits on job server and
> then job manager and task manager
> task manager and job manager is one Container
>
> Here is  My custom Dockerfile. name:custom-flink
>
> # Starting with the base Flink image
> FROM apache/flink:1.16-java11
> ARG FLINK_VERSION=1.16
> ARG KAFKA_VERSION=2.8.0
>
> # Install python3.8 and its associated dependencies, followed by pyflink
> RUN set -ex; \
> apt-get update && \
> apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev
> libffi-dev lzma liblzma-dev && \
> wget https://www.python.org/ftp/python/3.8.0/Python-3.8.0.tgz && \
> tar -xvf Python-3.8.0.tgz && \
> cd Python-3.8.0 && \
> ./configure --without-tests --enable-shared && \
> make -j4 && \
> make install && \
> ldconfig /usr/local/lib && \
> cd .. && rm -f Python-3.8.0.tgz && rm -rf Python-3.8.0 && \
> ln -s /usr/local/bin/python3.8 /usr/local/bin/python && \
> ln -s /usr/local/bin/pip3.8 /usr/local/bin/pip && \
> apt-get clean && \
> rm -rf /var/lib/apt/lists/* && \
> python -m pip install --upgrade pip; \
> pip install apache-flink==${FLINK_VERSION}; \
> pip install kafka-python
>
> RUN pip install --no-cache-dir apache-beam[gcp]==2.48.0
>
> # Copy files from official SDK image, including script/dependencies.
> COPY --from=apache/beam_python3.8_sdk:2.48.0 /opt/apache/beam/
> /opt/apache/beam/
>
> # java SDK
> COPY --from=apache/beam_java11_sdk:2.48.0 /opt/apache/beam/
> /opt/apache/beam_java/
>
> RUN apt-get update && apt-get install -y python3-venv && rm -rf
> /var/lib/apt/lists/*
>
> # Give permissions to the /opt/apache/beam-venv directory
> RUN mkdir -p /opt/apache/beam-venv && chown -R 9999:9999
> /opt/apache/beam-venv
>
> Here is my Deployment file for Job manager,Task manager plus worker-pool
> and job server
>
>
> apiVersion: v1
> kind: Service
> metadata:
> name: flink-jobmanager
> namespace: flink
> spec:
> type: ClusterIP
> ports:
> - name: rpc
> port: 6123
> - name: blob-server
> port: 6124
> - name: webui
> port: 8081
> selector:
> app: flink
> component: jobmanager
> ---
> apiVersion: v1
> kind: Service
> metadata:
> name: beam-worker-pool
> namespace: flink
> spec:
> selector:
> app: flink
> component: taskmanager
> ports:
> - protocol: TCP
> port: 50000
> targetPort: 50000
> name: pool
> ---
> apiVersion: apps/v1
> kind: Deployment
> metadata:
> name: flink-jobmanager
> namespace: flink
> spec:
> replicas: 1
> selector:
> matchLabels:
> app: flink
> component: jobmanager
> template:
> metadata:
> labels:
> app: flink
> component: jobmanager
> spec:
> containers:
> - name: jobmanager
> image: custom-flink:latest
> imagePullPolicy: IfNotPresent
> args: ["jobmanager"]
> ports:
> - containerPort: 6123
> name: rpc
> - containerPort: 6124
> name: blob-server
> - containerPort: 8081
> name: webui
> livenessProbe:
> tcpSocket:
> port: 6123
> initialDelaySeconds: 30
> periodSeconds: 60
> volumeMounts:
> - name: flink-config-volume
> mountPath: /opt/flink/conf
> - name: flink-staging
> mountPath: /tmp/beam-artifact-staging
> securityContext:
> runAsUser: 9999
> resources:
> requests:
> memory: "1Gi"
> cpu: "1"
> limits:
> memory: "1Gi"
> cpu: "1"
> volumes:
> - name: flink-config-volume
> configMap:
> name: flink-config
> items:
> - key: flink-conf.yaml
> path: flink-conf.yaml
> - key: log4j-console.properties
> path: log4j-console.properties
> - name: flink-staging
> persistentVolumeClaim:
> claimName: staging-artifacts-claim
> ---
> apiVersion: apps/v1
> kind: Deployment
> metadata:
> name: flink-taskmanager
> namespace: flink
> spec:
> replicas: 1
> selector:
> matchLabels:
> app: flink
> component: taskmanager
> template:
> metadata:
> labels:
> app: flink
> component: taskmanager
> spec:
> containers:
> - name: taskmanager-beam-worker
> image: custom-flink:latest
> imagePullPolicy: IfNotPresent
> args:
> - /bin/bash
> - -c
> - "/opt/flink/bin/taskmanager.sh start-foreground & python -m
> apache_beam.runners.worker.worker_pool_main
> --container_executable=/opt/apache/beam/boot --service_port=50000 & tail -f
> /dev/null"
> ports:
> - containerPort: 6122
> name: rpc
> - containerPort: 6125
> name: query-state
> - containerPort: 50000
> name: pool
> livenessProbe:
> tcpSocket:
> port: 6122
> initialDelaySeconds: 30
> periodSeconds: 60
> volumeMounts:
> - name: flink-config-volume
> mountPath: /opt/flink/conf/
> - name: flink-staging
> mountPath: /tmp/beam-artifact-staging
> securityContext:
> runAsUser: 9999
> resources:
> requests:
> memory: "4Gi"
> cpu: "4"
> limits:
> memory: "4Gi"
> cpu: "4"
> volumes:
> - name: flink-config-volume
> configMap:
> name: flink-config
> items:
> - key: flink-conf.yaml
> path: flink-conf.yaml
> - key: log4j-console.properties
> path: log4j-console.properties
> - name: flink-staging
> persistentVolumeClaim:
> claimName: staging-artifacts-claim
> ---
> apiVersion: apps/v1
> kind: Deployment
> metadata:
> name: beam-jobserver
> namespace: flink
> spec:
> replicas: 1
> selector:
> matchLabels:
> app: beam
> component: jobserver
> template:
> metadata:
> labels:
> app: beam
> component: jobserver
> spec:
> containers:
> - name: beam-jobserver
> image: apache/beam_flink1.16_job_server:2.48.0
> args: ["--flink-master=flink-jobmanager:8081"]
> ports:
> - containerPort: 8097
> - containerPort: 8098
> - containerPort: 8099
> volumeMounts:
> - name: beam-staging
> mountPath: /tmp/beam-artifact-staging
> resources:
> requests:
> memory: "2Gi"
> cpu: "2"
> limits:
> memory: "2Gi"
> cpu: "2"
> volumes:
> - name: beam-staging
> persistentVolumeClaim:
> claimName: staging-artifacts-claim
> ---
> apiVersion: v1
> kind: Service
> metadata:
> name: beam-jobserver
> namespace: flink
> spec:
> type: ClusterIP
> ports:
> - name: grpc-port
> port: 8097
> targetPort: 8097
> - name: expansion-port
> port: 8098
> targetPort: 8098
> - name: job-manage-port
> port: 8099
> targetPort: 8099
> selector:
> app: beam
> component: jobserver
>
> pvc
>
> apiVersion: v1
> kind: PersistentVolumeClaim
> metadata:
> name: staging-artifacts-claim
> namespace: flink
> spec:
> accessModes:
> - ReadWriteOnce
> resources:
> requests:
> storage: 5Gi
> storageClassName: standard
>
>
>
> Then I am running a Pod  with apache/beam_python3.8_sdk:2.48.0.
> and installing java in it because expansion required to run the code here
> is my code that is running from above container
> ```
> import json
> import logging
> import apache_beam as beam
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.io.kafka import ReadFromKafka,
> default_io_expansion_service
>
> def run_beam_pipeline():
> logging.getLogger().setLevel(logging.INFO)
>
> consumer_config = {
> 'bootstrap.servers':
> 'cluster-0-kafka-bootstrap.strimzi.svc.cluster.local:9092',
> 'group.id': 'beamgrouptest',
> 'auto.offset.reset': 'earliest',
> 'key.deserializer':
> 'org.apache.kafka.common.serialization.StringDeserializer',
> 'value.deserializer':
> 'org.apache.kafka.common.serialization.StringDeserializer',
> }
>
> topic = 'locations'
>
> flink_options = PipelineOptions([
> "--runner=PortableRunner",
> "--artifact_endpoint=beam-jobserver:8098",
> "--job_endpoint=beam-jobserver:8099",
> "--environment_type=EXTERNAL",
> "--environment_config=beam-worker-pool:50000",
> # "--environment_config={\"command\": \"/opt/apache/beam/boot\"}",
> ])
>
> with beam.Pipeline(options=flink_options) as pipeline:
> messages = (
> pipeline
> | "Read from Kafka" >> ReadFromKafka(
> consumer_config=consumer_config,
> topics=[topic],
> with_metadata=False,
> expansion_service=default_io_expansion_service(
> append_args=[
> '--defaultEnvironmentType=PROCESS',
> "--defaultEnvironmentConfig={\"command\": \"/opt/apache/beam/boot\"}",
> ]
> )
> )
> | "Print messages" >> beam.Map(print)
> )
>
> logging.info("Pipeline execution completed.")
>
> if __name__ == '__main__':
> run_beam_pipeline()
>
> ```
>
> When starting a job here is logs, it is downloading java expansion service.
>
>
> python3 testing.py
>
>
>
>
>
> <jemalloc>: MADV_DONTNEED does not work (memset will be used instead)
>
> <jemalloc>: (This is the expected behaviour if you are running under QEMU)
>
> INFO:apache_beam.utils.subprocess_server:Using cached job server jar from
> https://repo.maven.apache.org/maven2/org/apache/beam/beam-sdks-java-io-expansion-service/2.48.0/beam-sdks-java-io-expansion-service-2.48.0.jar
>
> INFO:root:Starting a JAR-based expansion service from JAR
> /root/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.48.0.jar
>
>
> INFO:apache_beam.utils.subprocess_server:Starting service with ['java'
> '-jar'
> '/root/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.48.0.jar'
> '35371'
> '--filesToStage=/root/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.48.0.jar'
> '--defaultEnvironmentType=PROCESS' '--defaultEnvironmentConfig={"command":
> "/opt/apache/beam/boot"}']
>
> INFO:apache_beam.utils.subprocess_server:Starting expansion service at
> localhost:35371
>
> INFO:apache_beam.utils.subprocess_server:Aug 09, 2023 8:35:00 AM
> org.apache.beam.sdk.expansion.service.ExpansionService
> loadRegisteredTransforms
>
> INFO:apache_beam.utils.subprocess_server:INFO: Registering external
> transforms: [beam:transform:org.apache.beam:kafka_read_with_metadata:v1,
> beam:transform:org.apache.beam:kafka_read_without_metadata:v1,
> beam:transform:org.apache.beam:kafka_write:v1,
> beam:external:java:generate_sequence:v1]
>
> INFO:apache_beam.utils.subprocess_server:
>
> INFO:apache_beam.utils.subprocess_server:Registered transforms:
>
> INFO:apache_beam.utils.subprocess_server: beam:transform:org.apache.beam:kafka_read_with_metadata:v1:
> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@5c6648b0
>
> INFO:apache_beam.utils.subprocess_server: beam:transform:org.apache.beam:kafka_read_without_metadata:v1:
> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@6f1de4c7
>
> INFO:apache_beam.utils.subprocess_server: beam:transform:org.apache.beam:kafka_write:v1:
> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@459e9125
>
> INFO:apache_beam.utils.subprocess_server: beam:external:java:generate_sequence:v1:
> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@128d2484
>
> INFO:apache_beam.utils.subprocess_server:
>
> INFO:apache_beam.utils.subprocess_server:Registered
> SchemaTransformProviders:
>
> INFO:apache_beam.utils.subprocess_server:
> beam:schematransform:org.apache.beam:kafka_read:v1
>
> INFO:apache_beam.utils.subprocess_server:
> beam:schematransform:org.apache.beam:kafka_write:v1
>
> WARNING:root:Waiting for grpc channel to be ready at localhost:35371.
>
>
>
>
> WARNING:root:Waiting for grpc channel to be ready at localhost:35371.
>
> WARNING:root:Waiting for grpc channel to be ready at localhost:35371.
>
> WARNING:root:Waiting for grpc channel to be ready at localhost:35371.
>
> INFO:apache_beam.utils.subprocess_server:Aug 09, 2023 8:35:06 AM
> org.apache.beam.sdk.expansion.service.ExpansionService expand
>
> INFO:apache_beam.utils.subprocess_server:INFO: Expanding 'Read from Kafka'
> with URN 'beam:transform:org.apache.beam:kafka_read_without_metadata:v1'
>
> INFO:apache_beam.utils.subprocess_server:Dependencies list: {}
>
> INFO:apache_beam.utils.subprocess_server:Aug 09, 2023 8:35:07 AM
> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader
> payloadToConfig
>
> INFO:apache_beam.utils.subprocess_server:WARNING: Configuration class
> 'org.apache.beam.sdk.io.kafka.KafkaIO$Read$External$Configuration' has no
> schema registered. Attempting to construct with setter approach.
>
> INFO:apache_beam.utils.subprocess_server:Aug 09, 2023 8:35:08 AM
> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader
> payloadToConfig
>
> INFO:apache_beam.utils.subprocess_server:WARNING: Configuration class
> 'org.apache.beam.sdk.io.kafka.KafkaIO$Read$External$Configuration' has no
> schema registered. Attempting to construct with setter approach.
>
> INFO:root:Default Python SDK image for environment is
> apache/beam_python3.8_sdk:2.48.0
>
> INFO:apache_beam.runners.portability.fn_api_runner.translations:====================
> <function pack_combiners at 0x402e7a95e0> ====================
>
> INFO:apache_beam.runners.portability.fn_api_runner.translations:====================
> <function lift_combiners at 0x402e7a9670> ====================
>
> INFO:apache_beam.runners.portability.fn_api_runner.translations:====================
> <function sort_stages at 0x402e7a9dc0> ====================
>
> INFO:apache_beam.runners.portability.portable_runner:Job state changed to
> STOPPED
>
> INFO:apache_beam.runners.portability.portable_runner:Job state changed to
> STARTING
>
> INFO:apache_beam.runners.portability.portable_runner:Job state changed to
> RUNNING
>
>
>
>
>
>
>
> *Error*
>
> 2023-08-09 10:25:37,146 INFO  org.apache.flink.configuration.GlobalConfiguration
>           [] - Loading configuration property: taskmanager.rpc.port, 6122
>
> 2023-08-09 10:25:37,260 INFO  org.apache.flink.runtime.taskmanager.Task
>                   [] - Source: Impulse -> [3]Read from
> Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor),
> KafkaIO.ReadSourceDescriptors} (1/1)#0
> (3a42a4a4b7edf55899dc956496d8f99b_cbc357ccb763df2852fee8c4fc7d55f2_0_0)
> switched from INITIALIZING to RUNNING.
>
> 2023-08-09 10:25:37,724 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder
> [] - Finished to build heap keyed state-backend.
>
> 2023-08-09 10:25:37,731 INFO
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend    [] -
> Initializing heap keyed state backend with stream factory.
>
> 2023-08-09 10:25:37,771 INFO  org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService
> [] - getProcessBundleDescriptor request with id 1-4
>
> 2023-08-09 10:25:37,876 INFO  /usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py:889
> [] - Creating insecure state channel for localhost:45429.
>
> 2023-08-09 10:25:37,877 INFO  /usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py:896
> [] - State channel established.
>
> 2023-08-09 10:25:37,918 INFO  /usr/local/lib/python3.8/site-packages/apache_beam/transforms/environments.py:376
> [] - Default Python SDK image for environment is
> apache/beam_python3.8_sdk:2.48.0
>
> 2023-08-09 10:25:37,928 ERROR
> /usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py:299
> [] - Error processing instruction 1. Original traceback is
>
> Traceback (most recent call last):
>
>   File
> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 295, in _execute
>
>     response = task()
>
>   File
> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 370, in <lambda>
>
>     lambda: self.create_worker().do_instruction(request), request)
>
>   File
> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 629, in do_instruction
>
>     return getattr(self, request_type)(
>
>   File
> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 660, in process_bundle
>
>     bundle_processor = self.bundle_processor_cache.get(
>
>   File
> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 491, in get
>
>     processor = bundle_processor.BundleProcessor(
>
>   File
> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 876, in __init__
>
>
> _verify_descriptor_created_in_a_compatible_env(process_bundle_descriptor)
>
>   File
> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 839, in _verify_descriptor_created_in_a_compatible_env
>
>     raise RuntimeError(
>
> *RuntimeError: Pipeline construction environment and pipeline runtime
> environment are not compatible. If you use a custom container image, check
> that the Python interpreter minor version and the Apache Beam version in
> your image match the versions used at pipeline construction time.
> Submission environment: beam:version:sdk_base:apache/beam_java8_sdk:2.48.0.
> Runtime environment:
> beam:version:sdk_base:apache/beam_python3.8_sdk:2.48.0.*
>
>
>
>
> 2023-08-09 10:25:37,931 INFO  org.apache.flink.runtime.taskmanager.Task
>                   [] - [3]Read from Kafka/{KafkaIO.Read, Remove Kafka
> Metadata} -> [1]Print messages (1/1)#0
> (3a42a4a4b7edf55899dc956496d8f99b_03f93075562d7d50bb0b07080b2ebe35_0_0)
> switched from INITIALIZING to RUNNING.
>
> 2023-08-09 10:28:37,784 WARN  org.apache.flink.runtime.taskmanager.Task
>                   [] - Source: Impulse -> [3]Read from
> Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor),
> KafkaIO.ReadSourceDescriptors} (1/1)#0
> (3a42a4a4b7edf55899dc956496d8f99b_cbc357ccb763df2852fee8c4fc7d55f2_0_0)
> switched from RUNNING to FAILED with failure cause:
> java.lang.RuntimeException: Failed to start remote bundle
>
>
>
>
> Thanks
> kapil Dev
>
>
>

Re: Seeking Assistance to Resolve Issues/bug with Flink Runner on Kubernetes

Posted by Jan Lukavský <je...@seznam.cz>.
Hi Kapil,

if you don't have a special reason for running the jobserver manually, 
you can let Beam Python SDK to run it for you (and let it configure 
accordingly). You just need to pass `--runner=flink` (and 
--flink_master) to your flink_options (or via command-line). As Sam 
suggested, it would be good to try to run minimal pipeline without Kafka 
(which brings the complexity of cross-language pipeline) to see if the 
problem is with the jobserver or the expansion service.

I also have a working repo with k8s (minikube), flink and kafka - 
dockerfile: [1] (somewhat older versions), deployment: [2], example 
pipeline: [3].

Best,

  Jan


[1] 
https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam/blob/main/env/docker/flink/Dockerfile

[2] 
https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam/blob/main/env/manifests/flink.yaml

[3] 
https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam/blob/main/chapter6/src/main/python/max_word_length.py

On 8/14/23 22:12, Sam Bourne wrote:
> Hey Kapil,
>
> I grappled with a similar deployment and created this repo 
> <https://github.com/sambvfx/beam-flink-k8s> [1] to attempt to provide 
> others with some nuggets of useful information. We were running cross 
> language pipelines on flink connecting PubsubIO 
> <https://github.com/apache/beam/blob/v2.48.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L171> [2] 
> to other misc python transforms. No promises it will help, but feel 
> free to take a look as it's a close approximation to the setup that we 
> had working.
>
> Your particular error seems related to the Kafka transform. Does a 
> pure python pipeline execute as expected?
>
> [1] https://github.com/sambvfx/beam-flink-k8s
> [2] 
> https://github.com/apache/beam/blob/v2.48.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L171
>
>
> On Mon, Aug 14, 2023 at 11:05 AM Daniel Chen via user 
> <us...@beam.apache.org> wrote:
>
>     Not the OP, but is it possible to join the slack channel without
>     an apache.org <http://apache.org> email address? I tried joining
>     slack previously for support and gave up because it looked like it
>     wasn't.
>
>     On Mon, Aug 14, 2023 at 10:58 AM Kenneth Knowles <ke...@apache.org>
>     wrote:
>
>         There is a slack channel linked from
>         https://beam.apache.org/community/contact-us/ it is #beam on
>         the-asf.slack.com <http://the-asf.slack.com>
>
>         (you find this via beam.apache.org <http://beam.apache.org> ->
>         Community -> Contact Us)
>
>         It sounds like an issue with running a multi-language pipeline
>         on the portable flink runner. (something which I am not
>         equipped to help with in detail)
>
>         Kenn
>
>         On Wed, Aug 9, 2023 at 2:51 PM kapil singh
>         <ka...@gmail.com> wrote:
>
>             Hey,
>
>             I've been grappling with this issue for the past five days
>             and, despite my continuous efforts, I haven't found a
>             resolution. Additionally, I've been unable to locate a
>             Slack channel for Beam where I might seek assistance.
>
>             issue
>
>             *RuntimeError: Pipeline construction environment and
>             pipeline runtime environment are not compatible. If you
>             use a custom container image, check that the Python
>             interpreter minor version and the Apache Beam version in
>             your image match the versions used at pipeline
>             construction time. Submission environment:
>             beam:version:sdk_base:apache/beam_java8_sdk:2.48.0.
>             Runtime environment:
>             beam:version:sdk_base:apache/beam_python3.8_sdk:2.48.0.*
>
>
>
>             Here what i am trying to do
>
>              i am running job from kubernetes container that hits on
>             job server and then job manager and task manager
>             task manager and job manager is one Container
>
>             Here is  My custom Dockerfile. name:custom-flink
>
>             # Starting with the base Flink image
>             FROM apache/flink:1.16-java11
>             ARG FLINK_VERSION=1.16
>             ARG KAFKA_VERSION=2.8.0
>
>             # Install python3.8 and its associated dependencies,
>             followed by pyflink
>             RUN set -ex; \
>             apt-get update && \
>             apt-get install -y build-essential libssl-dev zlib1g-dev
>             libbz2-dev libffi-dev lzma liblzma-dev && \
>             wget
>             https://www.python.org/ftp/python/3.8.0/Python-3.8.0.tgz && \
>             tar -xvf Python-3.8.0.tgz && \
>             cd Python-3.8.0 && \
>             ./configure --without-tests --enable-shared && \
>             make -j4 && \
>             make install && \
>             ldconfig /usr/local/lib && \
>             cd .. && rm -f Python-3.8.0.tgz && rm -rf Python-3.8.0 && \
>             ln -s /usr/local/bin/python3.8 /usr/local/bin/python && \
>             ln -s /usr/local/bin/pip3.8 /usr/local/bin/pip && \
>             apt-get clean && \
>             rm -rf /var/lib/apt/lists/* && \
>             python -m pip install --upgrade pip; \
>             pip install apache-flink==${FLINK_VERSION}; \
>             pip install kafka-python
>
>             RUN pip install --no-cache-dir apache-beam[gcp]==2.48.0
>
>             # Copy files from official SDK image, including
>             script/dependencies.
>             COPY --from=apache/beam_python3.8_sdk:2.48.0
>             /opt/apache/beam/ /opt/apache/beam/
>
>             # java SDK
>             COPY --from=apache/beam_java11_sdk:2.48.0
>             /opt/apache/beam/ /opt/apache/beam_java/
>
>             RUN apt-get update && apt-get install -y python3-venv &&
>             rm -rf /var/lib/apt/lists/*
>
>             # Give permissions to the /opt/apache/beam-venv directory
>             RUN mkdir -p /opt/apache/beam-venv && chown -R 9999:9999
>             /opt/apache/beam-venv
>
>             Here is my Deployment file for Job manager,Task manager
>             plus worker-pool and job server
>
>
>             apiVersion: v1
>             kind: Service
>             metadata:
>             name: flink-jobmanager
>             namespace: flink
>             spec:
>             type: ClusterIP
>             ports:
>             - name: rpc
>             port: 6123
>             - name: blob-server
>             port: 6124
>             - name: webui
>             port: 8081
>             selector:
>             app: flink
>             component: jobmanager
>             ---
>             apiVersion: v1
>             kind: Service
>             metadata:
>             name: beam-worker-pool
>             namespace: flink
>             spec:
>             selector:
>             app: flink
>             component: taskmanager
>             ports:
>             - protocol: TCP
>             port: 50000
>             targetPort: 50000
>             name: pool
>             ---
>             apiVersion: apps/v1
>             kind: Deployment
>             metadata:
>             name: flink-jobmanager
>             namespace: flink
>             spec:
>             replicas: 1
>             selector:
>             matchLabels:
>             app: flink
>             component: jobmanager
>             template:
>             metadata:
>             labels:
>             app: flink
>             component: jobmanager
>             spec:
>             containers:
>             - name: jobmanager
>             image: custom-flink:latest
>             imagePullPolicy: IfNotPresent
>             args: ["jobmanager"]
>             ports:
>             - containerPort: 6123
>             name: rpc
>             - containerPort: 6124
>             name: blob-server
>             - containerPort: 8081
>             name: webui
>             livenessProbe:
>             tcpSocket:
>             port: 6123
>             initialDelaySeconds: 30
>             periodSeconds: 60
>             volumeMounts:
>             - name: flink-config-volume
>             mountPath: /opt/flink/conf
>             - name: flink-staging
>             mountPath: /tmp/beam-artifact-staging
>             securityContext:
>             runAsUser: 9999
>             resources:
>             requests:
>             memory: "1Gi"
>             cpu: "1"
>             limits:
>             memory: "1Gi"
>             cpu: "1"
>             volumes:
>             - name: flink-config-volume
>             configMap:
>             name: flink-config
>             items:
>             - key: flink-conf.yaml
>             path: flink-conf.yaml
>             - key: log4j-console.properties
>             path: log4j-console.properties
>             - name: flink-staging
>             persistentVolumeClaim:
>             claimName: staging-artifacts-claim
>             ---
>             apiVersion: apps/v1
>             kind: Deployment
>             metadata:
>             name: flink-taskmanager
>             namespace: flink
>             spec:
>             replicas: 1
>             selector:
>             matchLabels:
>             app: flink
>             component: taskmanager
>             template:
>             metadata:
>             labels:
>             app: flink
>             component: taskmanager
>             spec:
>             containers:
>             - name: taskmanager-beam-worker
>             image: custom-flink:latest
>             imagePullPolicy: IfNotPresent
>             args:
>             - /bin/bash
>             - -c
>             - "/opt/flink/bin/taskmanager.sh start-foreground & python
>             -m apache_beam.runners.worker.worker_pool_main
>             --container_executable=/opt/apache/beam/boot
>             --service_port=50000 & tail -f /dev/null"
>             ports:
>             - containerPort: 6122
>             name: rpc
>             - containerPort: 6125
>             name: query-state
>             - containerPort: 50000
>             name: pool
>             livenessProbe:
>             tcpSocket:
>             port: 6122
>             initialDelaySeconds: 30
>             periodSeconds: 60
>             volumeMounts:
>             - name: flink-config-volume
>             mountPath: /opt/flink/conf/
>             - name: flink-staging
>             mountPath: /tmp/beam-artifact-staging
>             securityContext:
>             runAsUser: 9999
>             resources:
>             requests:
>             memory: "4Gi"
>             cpu: "4"
>             limits:
>             memory: "4Gi"
>             cpu: "4"
>             volumes:
>             - name: flink-config-volume
>             configMap:
>             name: flink-config
>             items:
>             - key: flink-conf.yaml
>             path: flink-conf.yaml
>             - key: log4j-console.properties
>             path: log4j-console.properties
>             - name: flink-staging
>             persistentVolumeClaim:
>             claimName: staging-artifacts-claim
>             ---
>             apiVersion: apps/v1
>             kind: Deployment
>             metadata:
>             name: beam-jobserver
>             namespace: flink
>             spec:
>             replicas: 1
>             selector:
>             matchLabels:
>             app: beam
>             component: jobserver
>             template:
>             metadata:
>             labels:
>             app: beam
>             component: jobserver
>             spec:
>             containers:
>             - name: beam-jobserver
>             image: apache/beam_flink1.16_job_server:2.48.0
>             args: ["--flink-master=flink-jobmanager:8081"]
>             ports:
>             - containerPort: 8097
>             - containerPort: 8098
>             - containerPort: 8099
>             volumeMounts:
>             - name: beam-staging
>             mountPath: /tmp/beam-artifact-staging
>             resources:
>             requests:
>             memory: "2Gi"
>             cpu: "2"
>             limits:
>             memory: "2Gi"
>             cpu: "2"
>             volumes:
>             - name: beam-staging
>             persistentVolumeClaim:
>             claimName: staging-artifacts-claim
>             ---
>             apiVersion: v1
>             kind: Service
>             metadata:
>             name: beam-jobserver
>             namespace: flink
>             spec:
>             type: ClusterIP
>             ports:
>             - name: grpc-port
>             port: 8097
>             targetPort: 8097
>             - name: expansion-port
>             port: 8098
>             targetPort: 8098
>             - name: job-manage-port
>             port: 8099
>             targetPort: 8099
>             selector:
>             app: beam
>             component: jobserver
>
>             pvc
>
>             apiVersion: v1
>             kind: PersistentVolumeClaim
>             metadata:
>             name: staging-artifacts-claim
>             namespace: flink
>             spec:
>             accessModes:
>             - ReadWriteOnce
>             resources:
>             requests:
>             storage: 5Gi
>             storageClassName: standard
>
>
>
>             Then I am running a Pod  with
>             apache/beam_python3.8_sdk:2.48.0.
>             and installing java in it because expansion required to
>             run the code here is my code that is running from above
>             container
>             ```
>             import json
>             import logging
>             import apache_beam as beam
>             from apache_beam.options.pipeline_options import
>             PipelineOptions
>             from apache_beam.io.kafka import ReadFromKafka,
>             default_io_expansion_service
>
>             def run_beam_pipeline():
>             logging.getLogger().setLevel(logging.INFO)
>
>             consumer_config = {
>             'bootstrap.servers':
>             'cluster-0-kafka-bootstrap.strimzi.svc.cluster.local:9092',
>             'group.id <http://group.id>': 'beamgrouptest',
>             'auto.offset.reset': 'earliest',
>             'key.deserializer':
>             'org.apache.kafka.common.serialization.StringDeserializer',
>             'value.deserializer':
>             'org.apache.kafka.common.serialization.StringDeserializer',
>             }
>
>             topic = 'locations'
>
>             flink_options = PipelineOptions([
>             "--runner=PortableRunner",
>             "--artifact_endpoint=beam-jobserver:8098",
>             "--job_endpoint=beam-jobserver:8099",
>             "--environment_type=EXTERNAL",
>             "--environment_config=beam-worker-pool:50000",
>             # "--environment_config={\"command\":
>             \"/opt/apache/beam/boot\"}",
>             ])
>
>             with beam.Pipeline(options=flink_options) as pipeline:
>             messages = (
>             pipeline
>             | "Read from Kafka" >> ReadFromKafka(
>             consumer_config=consumer_config,
>             topics=[topic],
>             with_metadata=False,
>             expansion_service=default_io_expansion_service(
>             append_args=[
>             '--defaultEnvironmentType=PROCESS',
>             "--defaultEnvironmentConfig={\"command\":
>             \"/opt/apache/beam/boot\"}",
>             ]
>             )
>             )
>             | "Print messages" >> beam.Map(print)
>             )
>
>             logging.info("Pipeline execution completed.")
>
>             if __name__ == '__main__':
>             run_beam_pipeline()
>
>             ```
>
>             When starting a job here is logs, it is downloading java
>             expansion service.
>
>
>             python3 testing.py
>
>
>
>
>
>             <jemalloc>: MADV_DONTNEED does not work (memset will be
>             used instead)
>
>             <jemalloc>: (This is the expected behaviour if you are
>             running under QEMU)
>
>             INFO:apache_beam.utils.subprocess_server:Using cached job
>             server jar from
>             https://repo.maven.apache.org/maven2/org/apache/beam/beam-sdks-java-io-expansion-service/2.48.0/beam-sdks-java-io-expansion-service-2.48.0.jar
>
>             INFO:root:Starting a JAR-based expansion service from JAR
>             /root/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.48.0.jar
>
>             INFO:apache_beam.utils.subprocess_server:Starting service
>             with ['java' '-jar'
>             '/root/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.48.0.jar'
>             '35371'
>             '--filesToStage=/root/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.48.0.jar'
>             '--defaultEnvironmentType=PROCESS'
>             '--defaultEnvironmentConfig={"command":
>             "/opt/apache/beam/boot"}']
>
>             INFO:apache_beam.utils.subprocess_server:Starting
>             expansion service at localhost:35371
>
>             INFO:apache_beam.utils.subprocess_server:Aug 09, 2023
>             8:35:00 AM
>             org.apache.beam.sdk.expansion.service.ExpansionService
>             loadRegisteredTransforms
>
>             INFO:apache_beam.utils.subprocess_server:INFO: Registering
>             external transforms:
>             [beam:transform:org.apache.beam:kafka_read_with_metadata:v1,
>             beam:transform:org.apache.beam:kafka_read_without_metadata:v1,
>             beam:transform:org.apache.beam:kafka_write:v1,
>             beam:external:java:generate_sequence:v1]
>
>             INFO:apache_beam.utils.subprocess_server:
>
>             INFO:apache_beam.utils.subprocess_server:Registered
>             transforms:
>
>             INFO:apache_beam.utils.subprocess_server:beam:transform:org.apache.beam:kafka_read_with_metadata:v1:
>             org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@5c6648b0
>
>             INFO:apache_beam.utils.subprocess_server:beam:transform:org.apache.beam:kafka_read_without_metadata:v1:
>             org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@6f1de4c7
>
>             INFO:apache_beam.utils.subprocess_server:beam:transform:org.apache.beam:kafka_write:v1:
>             org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@459e9125
>
>             INFO:apache_beam.utils.subprocess_server:beam:external:java:generate_sequence:v1:
>             org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@128d2484
>
>             INFO:apache_beam.utils.subprocess_server:
>
>             INFO:apache_beam.utils.subprocess_server:Registered
>             SchemaTransformProviders:
>
>             INFO:apache_beam.utils.subprocess_server:beam:schematransform:org.apache.beam:kafka_read:v1
>
>             INFO:apache_beam.utils.subprocess_server:beam:schematransform:org.apache.beam:kafka_write:v1
>
>             WARNING:root:Waiting for grpc channel to be ready at
>             localhost:35371.
>
>
>
>
>             WARNING:root:Waiting for grpc channel to be ready at
>             localhost:35371.
>
>             WARNING:root:Waiting for grpc channel to be ready at
>             localhost:35371.
>
>             WARNING:root:Waiting for grpc channel to be ready at
>             localhost:35371.
>
>             INFO:apache_beam.utils.subprocess_server:Aug 09, 2023
>             8:35:06 AM
>             org.apache.beam.sdk.expansion.service.ExpansionService expand
>
>             INFO:apache_beam.utils.subprocess_server:INFO: Expanding
>             'Read from Kafka' with URN
>             'beam:transform:org.apache.beam:kafka_read_without_metadata:v1'
>
>             INFO:apache_beam.utils.subprocess_server:Dependencies list: {}
>
>             INFO:apache_beam.utils.subprocess_server:Aug 09, 2023
>             8:35:07 AM
>             org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader
>             payloadToConfig
>
>             INFO:apache_beam.utils.subprocess_server:WARNING:
>             Configuration class
>             'org.apache.beam.sdk.io.kafka.KafkaIO$Read$External$Configuration'
>             has no schema registered. Attempting to construct with
>             setter approach.
>
>             INFO:apache_beam.utils.subprocess_server:Aug 09, 2023
>             8:35:08 AM
>             org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader
>             payloadToConfig
>
>             INFO:apache_beam.utils.subprocess_server:WARNING:
>             Configuration class
>             'org.apache.beam.sdk.io.kafka.KafkaIO$Read$External$Configuration'
>             has no schema registered. Attempting to construct with
>             setter approach.
>
>             INFO:root:Default Python SDK image for environment is
>             apache/beam_python3.8_sdk:2.48.0
>
>             INFO:apache_beam.runners.portability.fn_api_runner.translations:====================
>             <function pack_combiners at 0x402e7a95e0> ====================
>
>             INFO:apache_beam.runners.portability.fn_api_runner.translations:====================
>             <function lift_combiners at 0x402e7a9670> ====================
>
>             INFO:apache_beam.runners.portability.fn_api_runner.translations:====================
>             <function sort_stages at 0x402e7a9dc0> ====================
>
>             INFO:apache_beam.runners.portability.portable_runner:Job
>             state changed to STOPPED
>
>             INFO:apache_beam.runners.portability.portable_runner:Job
>             state changed to STARTING
>
>             INFO:apache_beam.runners.portability.portable_runner:Job
>             state changed to RUNNING
>
>
>
>
>
>
>
>
>             *Error*
>
>             2023-08-09 10:25:37,146
>             INFOorg.apache.flink.configuration.GlobalConfiguration []
>             - Loading configuration property: taskmanager.rpc.port, 6122
>
>             2023-08-09 10:25:37,260
>             INFOorg.apache.flink.runtime.taskmanager.Task[] - Source:
>             Impulse -> [3]Read from
>             Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor),
>             KafkaIO.ReadSourceDescriptors} (1/1)#0
>             (3a42a4a4b7edf55899dc956496d8f99b_cbc357ccb763df2852fee8c4fc7d55f2_0_0)
>             switched from INITIALIZING to RUNNING.
>
>             2023-08-09 10:25:37,724
>             INFOorg.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder
>             [] - Finished to build heap keyed state-backend.
>
>             2023-08-09 10:25:37,731
>             INFOorg.apache.flink.runtime.state.heap.HeapKeyedStateBackend[]
>             - Initializing heap keyed state backend with stream factory.
>
>             2023-08-09 10:25:37,771
>             INFOorg.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService
>             [] - getProcessBundleDescriptor request with id 1-4
>
>             2023-08-09 10:25:37,876
>             INFO/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py:889
>             [] - Creating insecure state channel for localhost:45429.
>
>             2023-08-09 10:25:37,877
>             INFO/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py:896
>             [] - State channel established.
>
>             2023-08-09 10:25:37,918
>             INFO/usr/local/lib/python3.8/site-packages/apache_beam/transforms/environments.py:376
>             [] - Default Python SDK image for environment is
>             apache/beam_python3.8_sdk:2.48.0
>
>             2023-08-09 10:25:37,928 ERROR
>             /usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py:299
>             [] - Error processing instruction 1. Original traceback is
>
>             Traceback (most recent call last):
>
>             File
>             "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
>             line 295, in _execute
>
>             response = task()
>
>             File
>             "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
>             line 370, in <lambda>
>
>             lambda: self.create_worker().do_instruction(request), request)
>
>             File
>             "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
>             line 629, in do_instruction
>
>             return getattr(self, request_type)(
>
>             File
>             "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
>             line 660, in process_bundle
>
>             bundle_processor = self.bundle_processor_cache.get(
>
>             File
>             "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
>             line 491, in get
>
>             processor = bundle_processor.BundleProcessor(
>
>             File
>             "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
>             line 876, in __init__
>
>             _verify_descriptor_created_in_a_compatible_env(process_bundle_descriptor)
>
>             File
>             "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
>             line 839, in _verify_descriptor_created_in_a_compatible_env
>
>             raise RuntimeError(
>
>             *RuntimeError: Pipeline construction environment and
>             pipeline runtime environment are not compatible. If you
>             use a custom container image, check that the Python
>             interpreter minor version and the Apache Beam version in
>             your image match the versions used at pipeline
>             construction time. Submission environment:
>             beam:version:sdk_base:apache/beam_java8_sdk:2.48.0.
>             Runtime environment:
>             beam:version:sdk_base:apache/beam_python3.8_sdk:2.48.0.*
>
>
>             2023-08-09 10:25:37,931
>             INFOorg.apache.flink.runtime.taskmanager.Task[] - [3]Read
>             from Kafka/{KafkaIO.Read, Remove Kafka Metadata} ->
>             [1]Print messages (1/1)#0
>             (3a42a4a4b7edf55899dc956496d8f99b_03f93075562d7d50bb0b07080b2ebe35_0_0)
>             switched from INITIALIZING to RUNNING.
>
>             2023-08-09 10:28:37,784
>             WARNorg.apache.flink.runtime.taskmanager.Task[] - Source:
>             Impulse -> [3]Read from
>             Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor),
>             KafkaIO.ReadSourceDescriptors} (1/1)#0
>             (3a42a4a4b7edf55899dc956496d8f99b_cbc357ccb763df2852fee8c4fc7d55f2_0_0)
>             switched from RUNNING to FAILED with failure cause:
>             java.lang.RuntimeException: Failed to start remote bundle
>
>
>
>
>
>             Thanks
>             kapil Dev
>
>

Re: Seeking Assistance to Resolve Issues/bug with Flink Runner on Kubernetes

Posted by Sam Bourne <sa...@gmail.com>.
Hey Kapil,

I grappled with a similar deployment and created this repo
<https://github.com/sambvfx/beam-flink-k8s> [1] to attempt to provide
others with some nuggets of useful information. We were running cross
language pipelines on flink connecting PubsubIO
<https://github.com/apache/beam/blob/v2.48.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L171>
[2]
to other misc python transforms. No promises it will help, but feel free to
take a look as it's a close approximation to the setup that we had working.

Your particular error seems related to the Kafka transform. Does a pure
python pipeline execute as expected?

[1] https://github.com/sambvfx/beam-flink-k8s
[2]
https://github.com/apache/beam/blob/v2.48.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L171


On Mon, Aug 14, 2023 at 11:05 AM Daniel Chen via user <us...@beam.apache.org>
wrote:

> Not the OP, but is it possible to join the slack channel without an
> apache.org email address? I tried joining slack previously for support
> and gave up because it looked like it wasn't.
>
> On Mon, Aug 14, 2023 at 10:58 AM Kenneth Knowles <ke...@apache.org> wrote:
>
>> There is a slack channel linked from
>> https://beam.apache.org/community/contact-us/ it is #beam on
>> the-asf.slack.com
>>
>> (you find this via beam.apache.org -> Community -> Contact Us)
>>
>> It sounds like an issue with running a multi-language pipeline on the
>> portable flink runner. (something which I am not equipped to help with in
>> detail)
>>
>> Kenn
>>
>> On Wed, Aug 9, 2023 at 2:51 PM kapil singh <ka...@gmail.com>
>> wrote:
>>
>>> Hey,
>>>
>>> I've been grappling with this issue for the past five days and, despite
>>> my continuous efforts, I haven't found a resolution. Additionally, I've
>>> been unable to locate a Slack channel for Beam where I might seek
>>> assistance.
>>>
>>> issue
>>>
>>> *RuntimeError: Pipeline construction environment and pipeline runtime
>>> environment are not compatible. If you use a custom container image, check
>>> that the Python interpreter minor version and the Apache Beam version in
>>> your image match the versions used at pipeline construction time.
>>> Submission environment: beam:version:sdk_base:apache/beam_java8_sdk:2.48.0.
>>> Runtime environment:
>>> beam:version:sdk_base:apache/beam_python3.8_sdk:2.48.0.*
>>>
>>>
>>> Here what i am trying to do
>>>
>>>  i am running job from kubernetes container  that hits on job server and
>>> then job manager and task manager
>>> task manager and job manager is one Container
>>>
>>> Here is  My custom Dockerfile. name:custom-flink
>>>
>>> # Starting with the base Flink image
>>> FROM apache/flink:1.16-java11
>>> ARG FLINK_VERSION=1.16
>>> ARG KAFKA_VERSION=2.8.0
>>>
>>> # Install python3.8 and its associated dependencies, followed by pyflink
>>> RUN set -ex; \
>>> apt-get update && \
>>> apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev
>>> libffi-dev lzma liblzma-dev && \
>>> wget https://www.python.org/ftp/python/3.8.0/Python-3.8.0.tgz && \
>>> tar -xvf Python-3.8.0.tgz && \
>>> cd Python-3.8.0 && \
>>> ./configure --without-tests --enable-shared && \
>>> make -j4 && \
>>> make install && \
>>> ldconfig /usr/local/lib && \
>>> cd .. && rm -f Python-3.8.0.tgz && rm -rf Python-3.8.0 && \
>>> ln -s /usr/local/bin/python3.8 /usr/local/bin/python && \
>>> ln -s /usr/local/bin/pip3.8 /usr/local/bin/pip && \
>>> apt-get clean && \
>>> rm -rf /var/lib/apt/lists/* && \
>>> python -m pip install --upgrade pip; \
>>> pip install apache-flink==${FLINK_VERSION}; \
>>> pip install kafka-python
>>>
>>> RUN pip install --no-cache-dir apache-beam[gcp]==2.48.0
>>>
>>> # Copy files from official SDK image, including script/dependencies.
>>> COPY --from=apache/beam_python3.8_sdk:2.48.0 /opt/apache/beam/
>>> /opt/apache/beam/
>>>
>>> # java SDK
>>> COPY --from=apache/beam_java11_sdk:2.48.0 /opt/apache/beam/
>>> /opt/apache/beam_java/
>>>
>>> RUN apt-get update && apt-get install -y python3-venv && rm -rf
>>> /var/lib/apt/lists/*
>>>
>>> # Give permissions to the /opt/apache/beam-venv directory
>>> RUN mkdir -p /opt/apache/beam-venv && chown -R 9999:9999
>>> /opt/apache/beam-venv
>>>
>>> Here is my Deployment file for Job manager,Task manager plus worker-pool
>>> and job server
>>>
>>>
>>> apiVersion: v1
>>> kind: Service
>>> metadata:
>>> name: flink-jobmanager
>>> namespace: flink
>>> spec:
>>> type: ClusterIP
>>> ports:
>>> - name: rpc
>>> port: 6123
>>> - name: blob-server
>>> port: 6124
>>> - name: webui
>>> port: 8081
>>> selector:
>>> app: flink
>>> component: jobmanager
>>> ---
>>> apiVersion: v1
>>> kind: Service
>>> metadata:
>>> name: beam-worker-pool
>>> namespace: flink
>>> spec:
>>> selector:
>>> app: flink
>>> component: taskmanager
>>> ports:
>>> - protocol: TCP
>>> port: 50000
>>> targetPort: 50000
>>> name: pool
>>> ---
>>> apiVersion: apps/v1
>>> kind: Deployment
>>> metadata:
>>> name: flink-jobmanager
>>> namespace: flink
>>> spec:
>>> replicas: 1
>>> selector:
>>> matchLabels:
>>> app: flink
>>> component: jobmanager
>>> template:
>>> metadata:
>>> labels:
>>> app: flink
>>> component: jobmanager
>>> spec:
>>> containers:
>>> - name: jobmanager
>>> image: custom-flink:latest
>>> imagePullPolicy: IfNotPresent
>>> args: ["jobmanager"]
>>> ports:
>>> - containerPort: 6123
>>> name: rpc
>>> - containerPort: 6124
>>> name: blob-server
>>> - containerPort: 8081
>>> name: webui
>>> livenessProbe:
>>> tcpSocket:
>>> port: 6123
>>> initialDelaySeconds: 30
>>> periodSeconds: 60
>>> volumeMounts:
>>> - name: flink-config-volume
>>> mountPath: /opt/flink/conf
>>> - name: flink-staging
>>> mountPath: /tmp/beam-artifact-staging
>>> securityContext:
>>> runAsUser: 9999
>>> resources:
>>> requests:
>>> memory: "1Gi"
>>> cpu: "1"
>>> limits:
>>> memory: "1Gi"
>>> cpu: "1"
>>> volumes:
>>> - name: flink-config-volume
>>> configMap:
>>> name: flink-config
>>> items:
>>> - key: flink-conf.yaml
>>> path: flink-conf.yaml
>>> - key: log4j-console.properties
>>> path: log4j-console.properties
>>> - name: flink-staging
>>> persistentVolumeClaim:
>>> claimName: staging-artifacts-claim
>>> ---
>>> apiVersion: apps/v1
>>> kind: Deployment
>>> metadata:
>>> name: flink-taskmanager
>>> namespace: flink
>>> spec:
>>> replicas: 1
>>> selector:
>>> matchLabels:
>>> app: flink
>>> component: taskmanager
>>> template:
>>> metadata:
>>> labels:
>>> app: flink
>>> component: taskmanager
>>> spec:
>>> containers:
>>> - name: taskmanager-beam-worker
>>> image: custom-flink:latest
>>> imagePullPolicy: IfNotPresent
>>> args:
>>> - /bin/bash
>>> - -c
>>> - "/opt/flink/bin/taskmanager.sh start-foreground & python -m
>>> apache_beam.runners.worker.worker_pool_main
>>> --container_executable=/opt/apache/beam/boot --service_port=50000 & tail -f
>>> /dev/null"
>>> ports:
>>> - containerPort: 6122
>>> name: rpc
>>> - containerPort: 6125
>>> name: query-state
>>> - containerPort: 50000
>>> name: pool
>>> livenessProbe:
>>> tcpSocket:
>>> port: 6122
>>> initialDelaySeconds: 30
>>> periodSeconds: 60
>>> volumeMounts:
>>> - name: flink-config-volume
>>> mountPath: /opt/flink/conf/
>>> - name: flink-staging
>>> mountPath: /tmp/beam-artifact-staging
>>> securityContext:
>>> runAsUser: 9999
>>> resources:
>>> requests:
>>> memory: "4Gi"
>>> cpu: "4"
>>> limits:
>>> memory: "4Gi"
>>> cpu: "4"
>>> volumes:
>>> - name: flink-config-volume
>>> configMap:
>>> name: flink-config
>>> items:
>>> - key: flink-conf.yaml
>>> path: flink-conf.yaml
>>> - key: log4j-console.properties
>>> path: log4j-console.properties
>>> - name: flink-staging
>>> persistentVolumeClaim:
>>> claimName: staging-artifacts-claim
>>> ---
>>> apiVersion: apps/v1
>>> kind: Deployment
>>> metadata:
>>> name: beam-jobserver
>>> namespace: flink
>>> spec:
>>> replicas: 1
>>> selector:
>>> matchLabels:
>>> app: beam
>>> component: jobserver
>>> template:
>>> metadata:
>>> labels:
>>> app: beam
>>> component: jobserver
>>> spec:
>>> containers:
>>> - name: beam-jobserver
>>> image: apache/beam_flink1.16_job_server:2.48.0
>>> args: ["--flink-master=flink-jobmanager:8081"]
>>> ports:
>>> - containerPort: 8097
>>> - containerPort: 8098
>>> - containerPort: 8099
>>> volumeMounts:
>>> - name: beam-staging
>>> mountPath: /tmp/beam-artifact-staging
>>> resources:
>>> requests:
>>> memory: "2Gi"
>>> cpu: "2"
>>> limits:
>>> memory: "2Gi"
>>> cpu: "2"
>>> volumes:
>>> - name: beam-staging
>>> persistentVolumeClaim:
>>> claimName: staging-artifacts-claim
>>> ---
>>> apiVersion: v1
>>> kind: Service
>>> metadata:
>>> name: beam-jobserver
>>> namespace: flink
>>> spec:
>>> type: ClusterIP
>>> ports:
>>> - name: grpc-port
>>> port: 8097
>>> targetPort: 8097
>>> - name: expansion-port
>>> port: 8098
>>> targetPort: 8098
>>> - name: job-manage-port
>>> port: 8099
>>> targetPort: 8099
>>> selector:
>>> app: beam
>>> component: jobserver
>>>
>>> pvc
>>>
>>> apiVersion: v1
>>> kind: PersistentVolumeClaim
>>> metadata:
>>> name: staging-artifacts-claim
>>> namespace: flink
>>> spec:
>>> accessModes:
>>> - ReadWriteOnce
>>> resources:
>>> requests:
>>> storage: 5Gi
>>> storageClassName: standard
>>>
>>>
>>>
>>> Then I am running a Pod  with apache/beam_python3.8_sdk:2.48.0.
>>> and installing java in it because expansion required to run the code
>>> here is my code that is running from above container
>>> ```
>>> import json
>>> import logging
>>> import apache_beam as beam
>>> from apache_beam.options.pipeline_options import PipelineOptions
>>> from apache_beam.io.kafka import ReadFromKafka,
>>> default_io_expansion_service
>>>
>>> def run_beam_pipeline():
>>> logging.getLogger().setLevel(logging.INFO)
>>>
>>> consumer_config = {
>>> 'bootstrap.servers':
>>> 'cluster-0-kafka-bootstrap.strimzi.svc.cluster.local:9092',
>>> 'group.id': 'beamgrouptest',
>>> 'auto.offset.reset': 'earliest',
>>> 'key.deserializer':
>>> 'org.apache.kafka.common.serialization.StringDeserializer',
>>> 'value.deserializer':
>>> 'org.apache.kafka.common.serialization.StringDeserializer',
>>> }
>>>
>>> topic = 'locations'
>>>
>>> flink_options = PipelineOptions([
>>> "--runner=PortableRunner",
>>> "--artifact_endpoint=beam-jobserver:8098",
>>> "--job_endpoint=beam-jobserver:8099",
>>> "--environment_type=EXTERNAL",
>>> "--environment_config=beam-worker-pool:50000",
>>> # "--environment_config={\"command\": \"/opt/apache/beam/boot\"}",
>>> ])
>>>
>>> with beam.Pipeline(options=flink_options) as pipeline:
>>> messages = (
>>> pipeline
>>> | "Read from Kafka" >> ReadFromKafka(
>>> consumer_config=consumer_config,
>>> topics=[topic],
>>> with_metadata=False,
>>> expansion_service=default_io_expansion_service(
>>> append_args=[
>>> '--defaultEnvironmentType=PROCESS',
>>> "--defaultEnvironmentConfig={\"command\": \"/opt/apache/beam/boot\"}",
>>> ]
>>> )
>>> )
>>> | "Print messages" >> beam.Map(print)
>>> )
>>>
>>> logging.info("Pipeline execution completed.")
>>>
>>> if __name__ == '__main__':
>>> run_beam_pipeline()
>>>
>>> ```
>>>
>>> When starting a job here is logs, it is downloading java expansion
>>> service.
>>>
>>>
>>> python3 testing.py
>>>
>>>
>>>
>>>
>>>
>>> <jemalloc>: MADV_DONTNEED does not work (memset will be used instead)
>>>
>>> <jemalloc>: (This is the expected behaviour if you are running under
>>> QEMU)
>>>
>>> INFO:apache_beam.utils.subprocess_server:Using cached job server jar
>>> from
>>> https://repo.maven.apache.org/maven2/org/apache/beam/beam-sdks-java-io-expansion-service/2.48.0/beam-sdks-java-io-expansion-service-2.48.0.jar
>>>
>>> INFO:root:Starting a JAR-based expansion service from JAR
>>> /root/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.48.0.jar
>>>
>>>
>>> INFO:apache_beam.utils.subprocess_server:Starting service with ['java'
>>> '-jar'
>>> '/root/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.48.0.jar'
>>> '35371'
>>> '--filesToStage=/root/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.48.0.jar'
>>> '--defaultEnvironmentType=PROCESS' '--defaultEnvironmentConfig={"command":
>>> "/opt/apache/beam/boot"}']
>>>
>>> INFO:apache_beam.utils.subprocess_server:Starting expansion service at
>>> localhost:35371
>>>
>>> INFO:apache_beam.utils.subprocess_server:Aug 09, 2023 8:35:00 AM
>>> org.apache.beam.sdk.expansion.service.ExpansionService
>>> loadRegisteredTransforms
>>>
>>> INFO:apache_beam.utils.subprocess_server:INFO: Registering external
>>> transforms: [beam:transform:org.apache.beam:kafka_read_with_metadata:v1,
>>> beam:transform:org.apache.beam:kafka_read_without_metadata:v1,
>>> beam:transform:org.apache.beam:kafka_write:v1,
>>> beam:external:java:generate_sequence:v1]
>>>
>>> INFO:apache_beam.utils.subprocess_server:
>>>
>>> INFO:apache_beam.utils.subprocess_server:Registered transforms:
>>>
>>> INFO:apache_beam.utils.subprocess_server: beam:transform:org.apache.beam:kafka_read_with_metadata:v1:
>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@5c6648b0
>>>
>>> INFO:apache_beam.utils.subprocess_server: beam:transform:org.apache.beam:kafka_read_without_metadata:v1:
>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@6f1de4c7
>>>
>>> INFO:apache_beam.utils.subprocess_server: beam:transform:org.apache.beam:kafka_write:v1:
>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@459e9125
>>>
>>> INFO:apache_beam.utils.subprocess_server: beam:external:java:generate_sequence:v1:
>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@128d2484
>>>
>>> INFO:apache_beam.utils.subprocess_server:
>>>
>>> INFO:apache_beam.utils.subprocess_server:Registered
>>> SchemaTransformProviders:
>>>
>>> INFO:apache_beam.utils.subprocess_server:
>>> beam:schematransform:org.apache.beam:kafka_read:v1
>>>
>>> INFO:apache_beam.utils.subprocess_server:
>>> beam:schematransform:org.apache.beam:kafka_write:v1
>>>
>>> WARNING:root:Waiting for grpc channel to be ready at localhost:35371.
>>>
>>>
>>>
>>>
>>> WARNING:root:Waiting for grpc channel to be ready at localhost:35371.
>>>
>>> WARNING:root:Waiting for grpc channel to be ready at localhost:35371.
>>>
>>> WARNING:root:Waiting for grpc channel to be ready at localhost:35371.
>>>
>>> INFO:apache_beam.utils.subprocess_server:Aug 09, 2023 8:35:06 AM
>>> org.apache.beam.sdk.expansion.service.ExpansionService expand
>>>
>>> INFO:apache_beam.utils.subprocess_server:INFO: Expanding 'Read from
>>> Kafka' with URN
>>> 'beam:transform:org.apache.beam:kafka_read_without_metadata:v1'
>>>
>>> INFO:apache_beam.utils.subprocess_server:Dependencies list: {}
>>>
>>> INFO:apache_beam.utils.subprocess_server:Aug 09, 2023 8:35:07 AM
>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader
>>> payloadToConfig
>>>
>>> INFO:apache_beam.utils.subprocess_server:WARNING: Configuration class
>>> 'org.apache.beam.sdk.io.kafka.KafkaIO$Read$External$Configuration' has no
>>> schema registered. Attempting to construct with setter approach.
>>>
>>> INFO:apache_beam.utils.subprocess_server:Aug 09, 2023 8:35:08 AM
>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader
>>> payloadToConfig
>>>
>>> INFO:apache_beam.utils.subprocess_server:WARNING: Configuration class
>>> 'org.apache.beam.sdk.io.kafka.KafkaIO$Read$External$Configuration' has no
>>> schema registered. Attempting to construct with setter approach.
>>>
>>> INFO:root:Default Python SDK image for environment is
>>> apache/beam_python3.8_sdk:2.48.0
>>>
>>> INFO:apache_beam.runners.portability.fn_api_runner.translations:====================
>>> <function pack_combiners at 0x402e7a95e0> ====================
>>>
>>> INFO:apache_beam.runners.portability.fn_api_runner.translations:====================
>>> <function lift_combiners at 0x402e7a9670> ====================
>>>
>>> INFO:apache_beam.runners.portability.fn_api_runner.translations:====================
>>> <function sort_stages at 0x402e7a9dc0> ====================
>>>
>>> INFO:apache_beam.runners.portability.portable_runner:Job state changed
>>> to STOPPED
>>>
>>> INFO:apache_beam.runners.portability.portable_runner:Job state changed
>>> to STARTING
>>>
>>> INFO:apache_beam.runners.portability.portable_runner:Job state changed
>>> to RUNNING
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> *Error*
>>>
>>> 2023-08-09 10:25:37,146 INFO  org.apache.flink.configuration.GlobalConfiguration
>>>           [] - Loading configuration property: taskmanager.rpc.port,
>>> 6122
>>>
>>> 2023-08-09 10:25:37,260 INFO  org.apache.flink.runtime.taskmanager.Task
>>>                   [] - Source: Impulse -> [3]Read from
>>> Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor),
>>> KafkaIO.ReadSourceDescriptors} (1/1)#0
>>> (3a42a4a4b7edf55899dc956496d8f99b_cbc357ccb763df2852fee8c4fc7d55f2_0_0)
>>> switched from INITIALIZING to RUNNING.
>>>
>>> 2023-08-09 10:25:37,724 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder
>>> [] - Finished to build heap keyed state-backend.
>>>
>>> 2023-08-09 10:25:37,731 INFO
>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend    [] -
>>> Initializing heap keyed state backend with stream factory.
>>>
>>> 2023-08-09 10:25:37,771 INFO  org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService
>>> [] - getProcessBundleDescriptor request with id 1-4
>>>
>>> 2023-08-09 10:25:37,876 INFO  /usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py:889
>>> [] - Creating insecure state channel for localhost:45429.
>>>
>>> 2023-08-09 10:25:37,877 INFO  /usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py:896
>>> [] - State channel established.
>>>
>>> 2023-08-09 10:25:37,918 INFO  /usr/local/lib/python3.8/site-packages/apache_beam/transforms/environments.py:376
>>> [] - Default Python SDK image for environment is
>>> apache/beam_python3.8_sdk:2.48.0
>>>
>>> 2023-08-09 10:25:37,928 ERROR
>>> /usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py:299
>>> [] - Error processing instruction 1. Original traceback is
>>>
>>> Traceback (most recent call last):
>>>
>>>   File
>>> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>> line 295, in _execute
>>>
>>>     response = task()
>>>
>>>   File
>>> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>> line 370, in <lambda>
>>>
>>>     lambda: self.create_worker().do_instruction(request), request)
>>>
>>>   File
>>> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>> line 629, in do_instruction
>>>
>>>     return getattr(self, request_type)(
>>>
>>>   File
>>> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>> line 660, in process_bundle
>>>
>>>     bundle_processor = self.bundle_processor_cache.get(
>>>
>>>   File
>>> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>> line 491, in get
>>>
>>>     processor = bundle_processor.BundleProcessor(
>>>
>>>   File
>>> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 876, in __init__
>>>
>>>
>>> _verify_descriptor_created_in_a_compatible_env(process_bundle_descriptor)
>>>
>>>   File
>>> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 839, in _verify_descriptor_created_in_a_compatible_env
>>>
>>>     raise RuntimeError(
>>>
>>> *RuntimeError: Pipeline construction environment and pipeline runtime
>>> environment are not compatible. If you use a custom container image, check
>>> that the Python interpreter minor version and the Apache Beam version in
>>> your image match the versions used at pipeline construction time.
>>> Submission environment: beam:version:sdk_base:apache/beam_java8_sdk:2.48.0.
>>> Runtime environment:
>>> beam:version:sdk_base:apache/beam_python3.8_sdk:2.48.0.*
>>>
>>>
>>>
>>>
>>> 2023-08-09 10:25:37,931 INFO  org.apache.flink.runtime.taskmanager.Task
>>>                   [] - [3]Read from Kafka/{KafkaIO.Read, Remove Kafka
>>> Metadata} -> [1]Print messages (1/1)#0
>>> (3a42a4a4b7edf55899dc956496d8f99b_03f93075562d7d50bb0b07080b2ebe35_0_0)
>>> switched from INITIALIZING to RUNNING.
>>>
>>> 2023-08-09 10:28:37,784 WARN  org.apache.flink.runtime.taskmanager.Task
>>>                   [] - Source: Impulse -> [3]Read from
>>> Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor),
>>> KafkaIO.ReadSourceDescriptors} (1/1)#0
>>> (3a42a4a4b7edf55899dc956496d8f99b_cbc357ccb763df2852fee8c4fc7d55f2_0_0)
>>> switched from RUNNING to FAILED with failure cause:
>>> java.lang.RuntimeException: Failed to start remote bundle
>>>
>>>
>>>
>>>
>>> Thanks
>>> kapil Dev
>>>
>>>
>>>
>>

Re: Seeking Assistance to Resolve Issues/bug with Flink Runner on Kubernetes

Posted by Daniel Chen via user <us...@beam.apache.org>.
Not the OP, but is it possible to join the slack channel without an
apache.org email address? I tried joining slack previously for support and
gave up because it looked like it wasn't.

On Mon, Aug 14, 2023 at 10:58 AM Kenneth Knowles <ke...@apache.org> wrote:

> There is a slack channel linked from
> https://beam.apache.org/community/contact-us/ it is #beam on
> the-asf.slack.com
>
> (you find this via beam.apache.org -> Community -> Contact Us)
>
> It sounds like an issue with running a multi-language pipeline on the
> portable flink runner. (something which I am not equipped to help with in
> detail)
>
> Kenn
>
> On Wed, Aug 9, 2023 at 2:51 PM kapil singh <ka...@gmail.com> wrote:
>
>> Hey,
>>
>> I've been grappling with this issue for the past five days and, despite
>> my continuous efforts, I haven't found a resolution. Additionally, I've
>> been unable to locate a Slack channel for Beam where I might seek
>> assistance.
>>
>> issue
>>
>> *RuntimeError: Pipeline construction environment and pipeline runtime
>> environment are not compatible. If you use a custom container image, check
>> that the Python interpreter minor version and the Apache Beam version in
>> your image match the versions used at pipeline construction time.
>> Submission environment: beam:version:sdk_base:apache/beam_java8_sdk:2.48.0.
>> Runtime environment:
>> beam:version:sdk_base:apache/beam_python3.8_sdk:2.48.0.*
>>
>>
>> Here what i am trying to do
>>
>>  i am running job from kubernetes container  that hits on job server and
>> then job manager and task manager
>> task manager and job manager is one Container
>>
>> Here is  My custom Dockerfile. name:custom-flink
>>
>> # Starting with the base Flink image
>> FROM apache/flink:1.16-java11
>> ARG FLINK_VERSION=1.16
>> ARG KAFKA_VERSION=2.8.0
>>
>> # Install python3.8 and its associated dependencies, followed by pyflink
>> RUN set -ex; \
>> apt-get update && \
>> apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev
>> libffi-dev lzma liblzma-dev && \
>> wget https://www.python.org/ftp/python/3.8.0/Python-3.8.0.tgz && \
>> tar -xvf Python-3.8.0.tgz && \
>> cd Python-3.8.0 && \
>> ./configure --without-tests --enable-shared && \
>> make -j4 && \
>> make install && \
>> ldconfig /usr/local/lib && \
>> cd .. && rm -f Python-3.8.0.tgz && rm -rf Python-3.8.0 && \
>> ln -s /usr/local/bin/python3.8 /usr/local/bin/python && \
>> ln -s /usr/local/bin/pip3.8 /usr/local/bin/pip && \
>> apt-get clean && \
>> rm -rf /var/lib/apt/lists/* && \
>> python -m pip install --upgrade pip; \
>> pip install apache-flink==${FLINK_VERSION}; \
>> pip install kafka-python
>>
>> RUN pip install --no-cache-dir apache-beam[gcp]==2.48.0
>>
>> # Copy files from official SDK image, including script/dependencies.
>> COPY --from=apache/beam_python3.8_sdk:2.48.0 /opt/apache/beam/
>> /opt/apache/beam/
>>
>> # java SDK
>> COPY --from=apache/beam_java11_sdk:2.48.0 /opt/apache/beam/
>> /opt/apache/beam_java/
>>
>> RUN apt-get update && apt-get install -y python3-venv && rm -rf
>> /var/lib/apt/lists/*
>>
>> # Give permissions to the /opt/apache/beam-venv directory
>> RUN mkdir -p /opt/apache/beam-venv && chown -R 9999:9999
>> /opt/apache/beam-venv
>>
>> Here is my Deployment file for Job manager,Task manager plus worker-pool
>> and job server
>>
>>
>> apiVersion: v1
>> kind: Service
>> metadata:
>> name: flink-jobmanager
>> namespace: flink
>> spec:
>> type: ClusterIP
>> ports:
>> - name: rpc
>> port: 6123
>> - name: blob-server
>> port: 6124
>> - name: webui
>> port: 8081
>> selector:
>> app: flink
>> component: jobmanager
>> ---
>> apiVersion: v1
>> kind: Service
>> metadata:
>> name: beam-worker-pool
>> namespace: flink
>> spec:
>> selector:
>> app: flink
>> component: taskmanager
>> ports:
>> - protocol: TCP
>> port: 50000
>> targetPort: 50000
>> name: pool
>> ---
>> apiVersion: apps/v1
>> kind: Deployment
>> metadata:
>> name: flink-jobmanager
>> namespace: flink
>> spec:
>> replicas: 1
>> selector:
>> matchLabels:
>> app: flink
>> component: jobmanager
>> template:
>> metadata:
>> labels:
>> app: flink
>> component: jobmanager
>> spec:
>> containers:
>> - name: jobmanager
>> image: custom-flink:latest
>> imagePullPolicy: IfNotPresent
>> args: ["jobmanager"]
>> ports:
>> - containerPort: 6123
>> name: rpc
>> - containerPort: 6124
>> name: blob-server
>> - containerPort: 8081
>> name: webui
>> livenessProbe:
>> tcpSocket:
>> port: 6123
>> initialDelaySeconds: 30
>> periodSeconds: 60
>> volumeMounts:
>> - name: flink-config-volume
>> mountPath: /opt/flink/conf
>> - name: flink-staging
>> mountPath: /tmp/beam-artifact-staging
>> securityContext:
>> runAsUser: 9999
>> resources:
>> requests:
>> memory: "1Gi"
>> cpu: "1"
>> limits:
>> memory: "1Gi"
>> cpu: "1"
>> volumes:
>> - name: flink-config-volume
>> configMap:
>> name: flink-config
>> items:
>> - key: flink-conf.yaml
>> path: flink-conf.yaml
>> - key: log4j-console.properties
>> path: log4j-console.properties
>> - name: flink-staging
>> persistentVolumeClaim:
>> claimName: staging-artifacts-claim
>> ---
>> apiVersion: apps/v1
>> kind: Deployment
>> metadata:
>> name: flink-taskmanager
>> namespace: flink
>> spec:
>> replicas: 1
>> selector:
>> matchLabels:
>> app: flink
>> component: taskmanager
>> template:
>> metadata:
>> labels:
>> app: flink
>> component: taskmanager
>> spec:
>> containers:
>> - name: taskmanager-beam-worker
>> image: custom-flink:latest
>> imagePullPolicy: IfNotPresent
>> args:
>> - /bin/bash
>> - -c
>> - "/opt/flink/bin/taskmanager.sh start-foreground & python -m
>> apache_beam.runners.worker.worker_pool_main
>> --container_executable=/opt/apache/beam/boot --service_port=50000 & tail -f
>> /dev/null"
>> ports:
>> - containerPort: 6122
>> name: rpc
>> - containerPort: 6125
>> name: query-state
>> - containerPort: 50000
>> name: pool
>> livenessProbe:
>> tcpSocket:
>> port: 6122
>> initialDelaySeconds: 30
>> periodSeconds: 60
>> volumeMounts:
>> - name: flink-config-volume
>> mountPath: /opt/flink/conf/
>> - name: flink-staging
>> mountPath: /tmp/beam-artifact-staging
>> securityContext:
>> runAsUser: 9999
>> resources:
>> requests:
>> memory: "4Gi"
>> cpu: "4"
>> limits:
>> memory: "4Gi"
>> cpu: "4"
>> volumes:
>> - name: flink-config-volume
>> configMap:
>> name: flink-config
>> items:
>> - key: flink-conf.yaml
>> path: flink-conf.yaml
>> - key: log4j-console.properties
>> path: log4j-console.properties
>> - name: flink-staging
>> persistentVolumeClaim:
>> claimName: staging-artifacts-claim
>> ---
>> apiVersion: apps/v1
>> kind: Deployment
>> metadata:
>> name: beam-jobserver
>> namespace: flink
>> spec:
>> replicas: 1
>> selector:
>> matchLabels:
>> app: beam
>> component: jobserver
>> template:
>> metadata:
>> labels:
>> app: beam
>> component: jobserver
>> spec:
>> containers:
>> - name: beam-jobserver
>> image: apache/beam_flink1.16_job_server:2.48.0
>> args: ["--flink-master=flink-jobmanager:8081"]
>> ports:
>> - containerPort: 8097
>> - containerPort: 8098
>> - containerPort: 8099
>> volumeMounts:
>> - name: beam-staging
>> mountPath: /tmp/beam-artifact-staging
>> resources:
>> requests:
>> memory: "2Gi"
>> cpu: "2"
>> limits:
>> memory: "2Gi"
>> cpu: "2"
>> volumes:
>> - name: beam-staging
>> persistentVolumeClaim:
>> claimName: staging-artifacts-claim
>> ---
>> apiVersion: v1
>> kind: Service
>> metadata:
>> name: beam-jobserver
>> namespace: flink
>> spec:
>> type: ClusterIP
>> ports:
>> - name: grpc-port
>> port: 8097
>> targetPort: 8097
>> - name: expansion-port
>> port: 8098
>> targetPort: 8098
>> - name: job-manage-port
>> port: 8099
>> targetPort: 8099
>> selector:
>> app: beam
>> component: jobserver
>>
>> pvc
>>
>> apiVersion: v1
>> kind: PersistentVolumeClaim
>> metadata:
>> name: staging-artifacts-claim
>> namespace: flink
>> spec:
>> accessModes:
>> - ReadWriteOnce
>> resources:
>> requests:
>> storage: 5Gi
>> storageClassName: standard
>>
>>
>>
>> Then I am running a Pod  with apache/beam_python3.8_sdk:2.48.0.
>> and installing java in it because expansion required to run the code here
>> is my code that is running from above container
>> ```
>> import json
>> import logging
>> import apache_beam as beam
>> from apache_beam.options.pipeline_options import PipelineOptions
>> from apache_beam.io.kafka import ReadFromKafka,
>> default_io_expansion_service
>>
>> def run_beam_pipeline():
>> logging.getLogger().setLevel(logging.INFO)
>>
>> consumer_config = {
>> 'bootstrap.servers':
>> 'cluster-0-kafka-bootstrap.strimzi.svc.cluster.local:9092',
>> 'group.id': 'beamgrouptest',
>> 'auto.offset.reset': 'earliest',
>> 'key.deserializer':
>> 'org.apache.kafka.common.serialization.StringDeserializer',
>> 'value.deserializer':
>> 'org.apache.kafka.common.serialization.StringDeserializer',
>> }
>>
>> topic = 'locations'
>>
>> flink_options = PipelineOptions([
>> "--runner=PortableRunner",
>> "--artifact_endpoint=beam-jobserver:8098",
>> "--job_endpoint=beam-jobserver:8099",
>> "--environment_type=EXTERNAL",
>> "--environment_config=beam-worker-pool:50000",
>> # "--environment_config={\"command\": \"/opt/apache/beam/boot\"}",
>> ])
>>
>> with beam.Pipeline(options=flink_options) as pipeline:
>> messages = (
>> pipeline
>> | "Read from Kafka" >> ReadFromKafka(
>> consumer_config=consumer_config,
>> topics=[topic],
>> with_metadata=False,
>> expansion_service=default_io_expansion_service(
>> append_args=[
>> '--defaultEnvironmentType=PROCESS',
>> "--defaultEnvironmentConfig={\"command\": \"/opt/apache/beam/boot\"}",
>> ]
>> )
>> )
>> | "Print messages" >> beam.Map(print)
>> )
>>
>> logging.info("Pipeline execution completed.")
>>
>> if __name__ == '__main__':
>> run_beam_pipeline()
>>
>> ```
>>
>> When starting a job here is logs, it is downloading java expansion
>> service.
>>
>>
>> python3 testing.py
>>
>>
>>
>>
>>
>> <jemalloc>: MADV_DONTNEED does not work (memset will be used instead)
>>
>> <jemalloc>: (This is the expected behaviour if you are running under QEMU)
>>
>> INFO:apache_beam.utils.subprocess_server:Using cached job server jar from
>> https://repo.maven.apache.org/maven2/org/apache/beam/beam-sdks-java-io-expansion-service/2.48.0/beam-sdks-java-io-expansion-service-2.48.0.jar
>>
>> INFO:root:Starting a JAR-based expansion service from JAR
>> /root/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.48.0.jar
>>
>>
>> INFO:apache_beam.utils.subprocess_server:Starting service with ['java'
>> '-jar'
>> '/root/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.48.0.jar'
>> '35371'
>> '--filesToStage=/root/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.48.0.jar'
>> '--defaultEnvironmentType=PROCESS' '--defaultEnvironmentConfig={"command":
>> "/opt/apache/beam/boot"}']
>>
>> INFO:apache_beam.utils.subprocess_server:Starting expansion service at
>> localhost:35371
>>
>> INFO:apache_beam.utils.subprocess_server:Aug 09, 2023 8:35:00 AM
>> org.apache.beam.sdk.expansion.service.ExpansionService
>> loadRegisteredTransforms
>>
>> INFO:apache_beam.utils.subprocess_server:INFO: Registering external
>> transforms: [beam:transform:org.apache.beam:kafka_read_with_metadata:v1,
>> beam:transform:org.apache.beam:kafka_read_without_metadata:v1,
>> beam:transform:org.apache.beam:kafka_write:v1,
>> beam:external:java:generate_sequence:v1]
>>
>> INFO:apache_beam.utils.subprocess_server:
>>
>> INFO:apache_beam.utils.subprocess_server:Registered transforms:
>>
>> INFO:apache_beam.utils.subprocess_server: beam:transform:org.apache.beam:kafka_read_with_metadata:v1:
>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@5c6648b0
>>
>> INFO:apache_beam.utils.subprocess_server: beam:transform:org.apache.beam:kafka_read_without_metadata:v1:
>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@6f1de4c7
>>
>> INFO:apache_beam.utils.subprocess_server: beam:transform:org.apache.beam:kafka_write:v1:
>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@459e9125
>>
>> INFO:apache_beam.utils.subprocess_server: beam:external:java:generate_sequence:v1:
>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@128d2484
>>
>> INFO:apache_beam.utils.subprocess_server:
>>
>> INFO:apache_beam.utils.subprocess_server:Registered
>> SchemaTransformProviders:
>>
>> INFO:apache_beam.utils.subprocess_server:
>> beam:schematransform:org.apache.beam:kafka_read:v1
>>
>> INFO:apache_beam.utils.subprocess_server:
>> beam:schematransform:org.apache.beam:kafka_write:v1
>>
>> WARNING:root:Waiting for grpc channel to be ready at localhost:35371.
>>
>>
>>
>>
>> WARNING:root:Waiting for grpc channel to be ready at localhost:35371.
>>
>> WARNING:root:Waiting for grpc channel to be ready at localhost:35371.
>>
>> WARNING:root:Waiting for grpc channel to be ready at localhost:35371.
>>
>> INFO:apache_beam.utils.subprocess_server:Aug 09, 2023 8:35:06 AM
>> org.apache.beam.sdk.expansion.service.ExpansionService expand
>>
>> INFO:apache_beam.utils.subprocess_server:INFO: Expanding 'Read from
>> Kafka' with URN
>> 'beam:transform:org.apache.beam:kafka_read_without_metadata:v1'
>>
>> INFO:apache_beam.utils.subprocess_server:Dependencies list: {}
>>
>> INFO:apache_beam.utils.subprocess_server:Aug 09, 2023 8:35:07 AM
>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader
>> payloadToConfig
>>
>> INFO:apache_beam.utils.subprocess_server:WARNING: Configuration class
>> 'org.apache.beam.sdk.io.kafka.KafkaIO$Read$External$Configuration' has no
>> schema registered. Attempting to construct with setter approach.
>>
>> INFO:apache_beam.utils.subprocess_server:Aug 09, 2023 8:35:08 AM
>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader
>> payloadToConfig
>>
>> INFO:apache_beam.utils.subprocess_server:WARNING: Configuration class
>> 'org.apache.beam.sdk.io.kafka.KafkaIO$Read$External$Configuration' has no
>> schema registered. Attempting to construct with setter approach.
>>
>> INFO:root:Default Python SDK image for environment is
>> apache/beam_python3.8_sdk:2.48.0
>>
>> INFO:apache_beam.runners.portability.fn_api_runner.translations:====================
>> <function pack_combiners at 0x402e7a95e0> ====================
>>
>> INFO:apache_beam.runners.portability.fn_api_runner.translations:====================
>> <function lift_combiners at 0x402e7a9670> ====================
>>
>> INFO:apache_beam.runners.portability.fn_api_runner.translations:====================
>> <function sort_stages at 0x402e7a9dc0> ====================
>>
>> INFO:apache_beam.runners.portability.portable_runner:Job state changed to
>> STOPPED
>>
>> INFO:apache_beam.runners.portability.portable_runner:Job state changed to
>> STARTING
>>
>> INFO:apache_beam.runners.portability.portable_runner:Job state changed to
>> RUNNING
>>
>>
>>
>>
>>
>>
>>
>> *Error*
>>
>> 2023-08-09 10:25:37,146 INFO  org.apache.flink.configuration.GlobalConfiguration
>>           [] - Loading configuration property: taskmanager.rpc.port, 6122
>>
>> 2023-08-09 10:25:37,260 INFO  org.apache.flink.runtime.taskmanager.Task
>>                   [] - Source: Impulse -> [3]Read from
>> Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor),
>> KafkaIO.ReadSourceDescriptors} (1/1)#0
>> (3a42a4a4b7edf55899dc956496d8f99b_cbc357ccb763df2852fee8c4fc7d55f2_0_0)
>> switched from INITIALIZING to RUNNING.
>>
>> 2023-08-09 10:25:37,724 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder
>> [] - Finished to build heap keyed state-backend.
>>
>> 2023-08-09 10:25:37,731 INFO
>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend    [] -
>> Initializing heap keyed state backend with stream factory.
>>
>> 2023-08-09 10:25:37,771 INFO  org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService
>> [] - getProcessBundleDescriptor request with id 1-4
>>
>> 2023-08-09 10:25:37,876 INFO  /usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py:889
>> [] - Creating insecure state channel for localhost:45429.
>>
>> 2023-08-09 10:25:37,877 INFO  /usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py:896
>> [] - State channel established.
>>
>> 2023-08-09 10:25:37,918 INFO  /usr/local/lib/python3.8/site-packages/apache_beam/transforms/environments.py:376
>> [] - Default Python SDK image for environment is
>> apache/beam_python3.8_sdk:2.48.0
>>
>> 2023-08-09 10:25:37,928 ERROR
>> /usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py:299
>> [] - Error processing instruction 1. Original traceback is
>>
>> Traceback (most recent call last):
>>
>>   File
>> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
>> line 295, in _execute
>>
>>     response = task()
>>
>>   File
>> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
>> line 370, in <lambda>
>>
>>     lambda: self.create_worker().do_instruction(request), request)
>>
>>   File
>> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
>> line 629, in do_instruction
>>
>>     return getattr(self, request_type)(
>>
>>   File
>> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
>> line 660, in process_bundle
>>
>>     bundle_processor = self.bundle_processor_cache.get(
>>
>>   File
>> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
>> line 491, in get
>>
>>     processor = bundle_processor.BundleProcessor(
>>
>>   File
>> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 876, in __init__
>>
>>
>> _verify_descriptor_created_in_a_compatible_env(process_bundle_descriptor)
>>
>>   File
>> "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 839, in _verify_descriptor_created_in_a_compatible_env
>>
>>     raise RuntimeError(
>>
>> *RuntimeError: Pipeline construction environment and pipeline runtime
>> environment are not compatible. If you use a custom container image, check
>> that the Python interpreter minor version and the Apache Beam version in
>> your image match the versions used at pipeline construction time.
>> Submission environment: beam:version:sdk_base:apache/beam_java8_sdk:2.48.0.
>> Runtime environment:
>> beam:version:sdk_base:apache/beam_python3.8_sdk:2.48.0.*
>>
>>
>>
>>
>> 2023-08-09 10:25:37,931 INFO  org.apache.flink.runtime.taskmanager.Task
>>                   [] - [3]Read from Kafka/{KafkaIO.Read, Remove Kafka
>> Metadata} -> [1]Print messages (1/1)#0
>> (3a42a4a4b7edf55899dc956496d8f99b_03f93075562d7d50bb0b07080b2ebe35_0_0)
>> switched from INITIALIZING to RUNNING.
>>
>> 2023-08-09 10:28:37,784 WARN  org.apache.flink.runtime.taskmanager.Task
>>                   [] - Source: Impulse -> [3]Read from
>> Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor),
>> KafkaIO.ReadSourceDescriptors} (1/1)#0
>> (3a42a4a4b7edf55899dc956496d8f99b_cbc357ccb763df2852fee8c4fc7d55f2_0_0)
>> switched from RUNNING to FAILED with failure cause:
>> java.lang.RuntimeException: Failed to start remote bundle
>>
>>
>>
>>
>> Thanks
>> kapil Dev
>>
>>
>>
>