You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Yang Wang (Jira)" <ji...@apache.org> on 2022/04/21 03:00:00 UTC

[jira] [Commented] (FLINK-27328) Could not resolve ResourceManager address

    [ https://issues.apache.org/jira/browse/FLINK-27328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17525391#comment-17525391 ] 

Yang Wang commented on FLINK-27328:
-----------------------------------

If you are using {{"--host", "$POD_IP"}} to make the JobManager binding rpc address to pod ip, then the TaskManager should also use the pod ip(not the service name "flink-jobmanager") to connect with JobManager.

> Could not resolve ResourceManager address
> -----------------------------------------
>
>                 Key: FLINK-27328
>                 URL: https://issues.apache.org/jira/browse/FLINK-27328
>             Project: Flink
>          Issue Type: Bug
>          Components: Kubernetes Operator
>    Affects Versions: 1.14.4
>         Environment: h3. JobManager
> {{apiVersion: v1
> kind: Service
> metadata:
>   name: jobmanager-cs
> spec:
>   type: NodePort
>   ports:
>   - name: ui
>     port: 8081
>   selector: 
>     app: flink
>      component: jobmanager
> ---
> apiVersion: v1
> kind: Service
> metadata:
>   name: jobmanager-hs
> spec:
>   type: ClusterIP
>   ports:
>     - port: 6123
>       name: rpc
>     - port: 6124
>       name: blob-server
>     - port: 6125
>       name: query
>   selector: 
>     app: flink
>     component: jobmanager
> ---
> apiVersion: apps/v1
> kind: Deployment
> metadata:
>   name: flink-jobmanager
> spec:
>   selector:
>     matchLabels:
>       app: flink
>   template:
>     metadata:
>       labels:
>         app: flink
>         component: jobmanager
>     spec:
>       restartPolicy: Always
>       containers:
>         - name: jobmanager
>           image: flink:1.13.1-scala_2.12
>           command: [bash,"-ec",bin/jobmanager.sh start-foreground cluster]
>           resources:
>             limits:
>               memory: "2024Mi"
>               cpu: "500m"
>           env:
>           - name: JOB_MANAGER_ID
>             valueFrom:
>               fieldRef:
>                 apiVersion: v1
>                 fieldPath: status.podIP
>           - name: POD_IP
>             valueFrom:
>               fieldRef:
>                 apiVersion: v1
>                 fieldPath: status.podIP
>           # The following args overwrite the value of jobmanager.rpc.address configured in the configuration config map to POD_IP.
>           args: ["standalone-job", "--host", "$POD_IP", "--job-classname", "org.apache.flink.application.Main"] #, <optional arguments>, <job arguments>]  optional arguments: ["--job-id", "<job id>", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState"]
>           ports:
>             - containerPort: 6123
>               name: rpc
>             - containerPort: 6124
>               name: blob-server
>             - containerPort: 6125
>               name: query
>             - containerPort: 8081
>               name: webui
>           volumeMounts:
>             - name: flink-config-volume
>               mountPath: /opt/flink/conf
>             - name: job-artifacts-volume
>               mountPath: /opt/flink/usrlib
>           securityContext:
>             runAsUser: 9999 
>       volumes:
>         - name: flink-config-volume
>           configMap:
>             name: flink-config
>             items:
>               - key: flink-conf.yaml
>                 path: flink-conf.yaml
>               - key: log4j-console.properties
>                 path: log4j-console.properties
>         - name: job-artifacts-volume
>           hostPath:
>             path: /config/flink}}
> h3. Task Manager
> {{apiVersion: apps/v1
> kind: Deployment
> metadata:
>   name: flink-taskmanager
> spec:
>   replicas: 2
>   selector:
>     matchLabels:
>       app: flink
>       component: taskmanager
>   template:
>     metadata:
>       labels:
>         app: flink
>         component: taskmanager
>     spec:
>       containers:
>       - name: taskmanager
>         image: flink:1.13.1-scala_2.12
>         env:
>           - name: K8S_POD_IP
>             valueFrom:
>               fieldRef:
>                 fieldPath: status.podIP
>         command: ["/bin/sh", "-ec", "sleep 1000"]
>         resources:
>           limits:
>             memory: "800Mi"
>             cpu: "2000m"
>         args: ["taskmanager","start-foreground","-Dtaskmanager.host=$K8S_POD_IP"]
>         ports:
>         - containerPort: 6122
>           name: rpc
>         - containerPort: 6125
>           name: query-state
>         volumeMounts:
>         - name: flink-config-volume
>           mountPath: /opt/flink/conf/
>         - name: job-artifacts-volume
>           mountPath: /opt/flink/usrlib
>         securityContext:
>           runAsUser: 9999  
>       volumes:
>       - name: flink-config-volume
>         configMap:
>           name: flink-config
>           items:
>           - key: flink-conf.yaml
>             path: flink-conf.yaml
>           - key: log4j-console.properties
>             path: log4j-console.properties
>       - name: job-artifacts-volume
>         hostPath:
>           path: /config/flink}}
> h3. ConfigMap
> {{apiVersion: v1
> kind: ConfigMap
> metadata:
>   name: flink-config
>   labels:
>     app: flink
> data:
>   flink-conf.yaml: |+
>     jobmanager.rpc.address: jobmanager-hs
>     taskmanager.numberOfTaskSlots: 1
>     blob.server.port: 6124
>     jobmanager.rpc.port: 6123
>     taskmanager.rpc.port: 6122
>     taskmanager.heap.size: 1024m
>     jobmanager.heap.size: 1024m
>     state.backend: filesystem
>     s3.access-key: k8sdemo
>     s3.secret-key: k8sdemo123
>     state.checkpoints.dir: /opt/flink/usrlib/checkpoints
>     state.savepoints.dir: /opt/flink/usrlib/savepoints
>     metrics.reporters: prom
>     metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
>     metrics.reporter.promport: 9249
>     queryable-state.proxy.ports: 6125
>     jobmanager.memory.process.size: 1600m
>     taskmanager.memory.process.size: 1728m
>     parallelism.default: 1
>     rest.address: 0.0.0.0
>     rest.bind-address: 0.0.0.0
>     jobmanager.execution.failover-strategy: region
>   log4j-console.properties: |+
>     # This affects logging for both user code and Flink
>     rootLogger.level = DEBUG
>     rootLogger.appenderRef.console.ref = ConsoleAppender
>     rootLogger.appenderRef.rolling.ref = RollingFileAppender
>     # Uncomment this if you want to _only_ change Flink's logging
>     #logger.flink.name = org.apache.flink
>     #logger.flink.level = DEBUG
>     # The following lines keep the log level of common libraries/connectors on
>     # log level INFO. The root logger does not override this. You have to manually
>     # change the log levels here.
>     logger.akka.name = akka
>     logger.akka.level = DEBUG
>     logger.kafka.name= org.apache.kafka
>     logger.kafka.level = DEBUG
>     logger.hadoop.name = org.apache.hadoop
>     logger.hadoop.level = DEBUG
>     logger.zookeeper.name = org.apache.zookeeper
>     logger.zookeeper.level = DEBUG
>     # Log all infos to the console
>     appender.console.name = ConsoleAppender
>     appender.console.type = CONSOLE
>     appender.console.layout.type = PatternLayout
>     appender.console.layout.pattern = %d\{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
>     # Log all infos in the given rolling file
>     appender.rolling.name = RollingFileAppender
>     appender.rolling.type = RollingFile
>     appender.rolling.append = false
>     appender.rolling.fileName = ${sys:log.file}
>     appender.rolling.filePattern = ${sys:log.file}.%i
>     appender.rolling.layout.type = PatternLayout
>     appender.rolling.layout.pattern = %d\{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
>     appender.rolling.policies.type = Policies
>     appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
>     appender.rolling.policies.size.size=100MB
>     appender.rolling.strategy.type = DefaultRolloverStrategy
>     appender.rolling.strategy.max = 10
>     # Suppress the irrelevant (wrong) warnings from the Netty channel handler
>     logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
>     logger.netty.level = OFF  }}
>            Reporter: Viswanath Shanmugam
>            Priority: Not a Priority
>
> and when i try to run the Task Manager with the follow command
> {quote}bin/taskmanager start-foreground -Dtaskmanager.host=$K8S_POD_IP
> {quote}
> I have the following exception
> JobManager :
> {quote}2021-08-27 09:16:57,917 ERROR akka.remote.EndpointWriter [] - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient [Actor[akka.tcp://flink@jobmanager-hs:6123/]] arriving at [akka.tcp://flink@jobmanager-hs:6123] inbound addresses are [akka.tcp://flink@cluster:6123]
> 2021-08-27 09:17:01,255 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Trigger heartbeat request.
> 2021-08-27 09:17:01,284 DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Trigger heartbeat request.
> 2021-08-27 09:17:10,008 DEBUG akka.remote.transport.netty.NettyTransport [] - Remote connection to [/172.17.0.1:34827] was disconnected because of [id: 0x13ae1d03, /172.17.0.1:34827 :> /172.17.0.23:6123] DISCONNECTED
> 2021-08-27 09:17:10,008 DEBUG akka.remote.transport.ProtocolStateActor [] - Association between local [tcp://flink@cluster:6123] and remote [tcp://flink@172.17.0.1:34827] was disassociated because the ProtocolStateActor failed: Unknown
> 2021-08-27 09:17:10,009 WARN akka.remote.ReliableDeliverySupervisor [] - Association with remote system [akka.tcp://flink@172.17.0.24:6122] has failed, address is now gated for [50] ms. Reason: [Disassociated]
> {quote}
> TaskManager:
> {quote}INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Could not resolve ResourceManager address akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_{_}, retrying in 10000 ms: Could not connect to rpc endpoint under address akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_{_}.
> INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Could not resolve ResourceManager address akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_{_}, retrying in 10000 ms: Could not connect to rpc endpoint under address akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_{_}.
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)