You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 旧城以西 <dt...@163.com> on 2021/01/08 02:12:45 UTC
flink on k8s 提交job时如何指定taskmanager的个数
各位大佬好:
目前我使用的flink on k8s session部署的flink 集群,集群里有3个taskmanager,每个taskmanager 有8个slot。我在提交job时,并行度设置的是3,目前job始终都是在一个taskmanager上,造成数据倾斜,每次checkpoint都不能成功。请问各位大佬在flink on k8s 上有没有类似yarn上的yarnslot 这样的参数来,控制使用taskmanager的个数。
flink taskmanager的配置
jobmanager.rpc.address: flink-jobmanager-svc
jobmanager.rpc.port: 6123
jobmanager.heap.size: 1024m
taskmanager.heap.size: 1024m
taskmanager.numberOfTaskSlots: 8
parallelism.default: 1
high-availability: zookeeper
high-availability.cluster-id: 1
high-availability.storageDir: file:///tmp
high-availability.zookeeper.quorum: zookeeper:2181
zookeeper.sasl.disable: true
classloader.resolve-order: parent-first
blob.server.port: 6124
query.server.port: 6125
flink jobmanger 配置
jobmanager.rpc.address: flink-jobmanager-svc
jobmanager.rpc.port: 6123
jobmanager.heap.size: 1024m
taskmanager.heap.size: 1024m
taskmanager.numberOfTaskSlots: 1
parallelism.default: 1
high-availability: zookeeper
high-availability.zookeeper.path.root: /flink
high-availability.cluster-id: 1
high-availability.storageDir: file:///data/ha
high-availability.zookeeper.quorum: zookeeper:2181
zookeeper.sasl.disable: true
high-availability.jobmanager.port: 6123
state.backend: filesystem
state.backend.fs.checkpointdir: file:///data/state
web.upload.dir: /data/upload
blob.server.port: 6124
metrics.internal.query-service.port: 6125
classloader.resolve-order: parent-first
~
flink-taskmanager.yml
---
apiVersion: apps/v1beta1
kind: Deployment
metadata:
name: flink-taskmanager
namespace: kafka
spec:
replicas: 3
selector:
matchLabels:
app: flink-taskmanager
template:
metadata:
labels:
app: flink-taskmanager
spec:
containers:
- image: ccr.ccs.tencentyun.com/jttest/flink:1.11.1-scala_2.12-java8
name: flink-taskmanager
ports:
- containerPort: 8081
name: flink-task
workingDir: /opt/flink
args:
- taskmanager
env:
- name: JOB_MANAGER_RPC_ADDRESS
value: flink-jobmanager-svc
- name: TZ
value: "Asia/Shanghai"
resources:
requests:
cpu: 1200m
memory: 1024Mi
limits:
cpu: 2000m
memory: 2048Mi
volumeMounts:
- name: flink-taskmanager-pv
mountPath: /opt/flink/conf
- name: flink-jobmanager-lib-pv
mountPath: /opt/flink/lib
volumes:
- name: flink-taskmanager-pv
persistentVolumeClaim:
claimName: flink-taskmanager-pvc
- name: flink-jobmanager-lib-pv
persistentVolumeClaim:
claimName: flink-jobmanager-lib-pvc
imagePullSecrets:
- name: registrysecret
flink 代码
// 获取flink 代码运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 指定checkpoint
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 指定checkpoint 的时间
env.enableCheckpointing(1000*60);
// 指定生成水印的时间间隔
env.getConfig().setAutoWatermarkInterval(100);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, org.apache.flink.api.common.time.Time.minutes(1)));