You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 小学生 <20...@qq.com> on 2020/10/09 06:18:44 UTC
FlinkJobNotFoundException错误
请教一个问题:从一个mysql表到另一个MySQL的操作,单机linux下运行,出现如下错误:
py4j.protocol.Py4JJavaError: An error occurred while calling o6.execute.
: java.util.concurrent.ExecutionException: org.apache.flink.runtime.rest.util.RestClientException: [org.apache.flink.runtime.rest.handler.RestHandlerException: org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find Flink job (4d6b1273229e0e16fa433c652b5cb74d)
代码如下:
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, CheckpointingMode
from pyflink.table import StreamTableEnvironment, EnvironmentSettings, DataTypes
batch_source_ddl = """
CREATE TABLE mh_source_tab (
lid INT,
dir INT,
posid BIGINT,
km BIGINT,
poleId BIGINT,
ts BIGINT,
rt Decimal(6,2),
time1 VARCHAR
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://**',
'table-name' = 'nj_mh_test', --基础数据表
'driver' = 'com.mysql.cj.jdbc.Driver',
'username' = 'root',
'password' = 'root',
--'sink.buffer-flush.max-rows' = '100',
'sink.buffer-flush.interval' = '1s'
)
warn_alarm_mh_ddl = """
CREATE TABLE warn_alarm_mh_sink (
lid INT,
dir INT,
posid BIGINT,
km BIGINT,
poleId BIGINT,
extremum Decimal(6,2),
PRIMARY KEY (lid,dir,posid,poleId) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://**',
'table-name' = 'warn_mh_alarm_result', --结果数据表
'driver' = 'com.mysql.cj.jdbc.Driver',
'username' = 'root',
'password' = 'root',
--'sink.buffer-flush.max-rows' = '100',
'sink.buffer-flush.interval' = '1s'
)
"""
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)
t_env.execute_sql(mh_source_tab)
t_env.execute_sql(warn_alarm_mh_ddl)
def threshold_alarm(delta_thres):
source = t_env.from_path("mh_source_tab") \
.where("rt <> -10000")\
.group_by("lid, dir, posid, km, poleId")\
.select("lid, dir, posid, km, poleId, max(rt) as max_rt, min(rt) as min_rt, max(rt)-min(rt) as extremum")\
.where("extremum >"+str(delta_thres))\
.select("lid, dir, posid, km, poleId, extremum")
source.execute_insert("warn_alarm_mh_sink") \
.get_job_client() \
.get_job_execution_result() \
.result()
if __name__ == '__main__':
threshold_alarm(delta_thres=0.5)