You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "godfrey he (Jira)" <ji...@apache.org> on 2020/01/19 11:37:00 UTC
[jira] [Updated] (FLINK-15669) SQL client can't cancel flink job
[ https://issues.apache.org/jira/browse/FLINK-15669?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
godfrey he updated FLINK-15669:
-------------------------------
Description:
in sql client, CLI client do cancel query through {{void cancelQuery(String sessionId, String resultId)}} method in {{Executor}}. However, the {{resultId}} is a random UUID, is not the job id. So CLI client can't cancel a running job.
related code in {{LocalExecutor}}:
{code:java}
private <C> ResultDescriptor executeQueryInternal(String sessionId, ExecutionContext<C> context, String query) {
......
// store the result with a unique id
final String resultId = UUID.randomUUID().toString();
resultStore.storeResult(resultId, result);
......
// create execution
final ProgramDeployer deployer = new ProgramDeployer(
configuration, jobName, pipeline);
// start result retrieval
result.startRetrieval(deployer);
return new ResultDescriptor(
resultId,
removeTimeAttributes(table.getSchema()),
result.isMaterialized());
}
private <T> void cancelQueryInternal(ExecutionContext<T> context, String resultId) {
......
// stop Flink job
try (final ClusterDescriptor<T> clusterDescriptor = context.createClusterDescriptor()) {
ClusterClient<T> clusterClient = null;
try {
// retrieve existing cluster
clusterClient = clusterDescriptor.retrieve(context.getClusterId()).getClusterClient();
try {
clusterClient.cancel(new JobID(StringUtils.hexStringToByte(resultId))).get();
} catch (Throwable t) {
// the job might has finished earlier
}
} catch (Exception e) {
throw new SqlExecutionException("Could not retrieve or create a cluster.", e);
} finally {
try {
if (clusterClient != null) {
clusterClient.close();
}
} catch (Exception e) {
// ignore
}
}
} catch (SqlExecutionException e) {
throw e;
} catch (Exception e) {
throw new SqlExecutionException("Could not locate a cluster.", e);
}
}
{code}
was:
in sql client, CLI client do cancel query through {{void cancelQuery(String sessionId, String resultId)}} method in {{Executor}}. However, the {{resultId}} is a random UUID, is not the job id. So CLI client can't cancel a running job.
{code:java}
private <C> ResultDescriptor executeQueryInternal(String sessionId, ExecutionContext<C> context, String query) {
......
// store the result with a unique id
final String resultId = UUID.randomUUID().toString();
resultStore.storeResult(resultId, result);
......
// create execution
final ProgramDeployer deployer = new ProgramDeployer(
configuration, jobName, pipeline);
// start result retrieval
result.startRetrieval(deployer);
return new ResultDescriptor(
resultId,
removeTimeAttributes(table.getSchema()),
result.isMaterialized());
}
private <T> void cancelQueryInternal(ExecutionContext<T> context, String resultId) {
......
// stop Flink job
try (final ClusterDescriptor<T> clusterDescriptor = context.createClusterDescriptor()) {
ClusterClient<T> clusterClient = null;
try {
// retrieve existing cluster
clusterClient = clusterDescriptor.retrieve(context.getClusterId()).getClusterClient();
try {
clusterClient.cancel(new JobID(StringUtils.hexStringToByte(resultId))).get();
} catch (Throwable t) {
// the job might has finished earlier
}
} catch (Exception e) {
throw new SqlExecutionException("Could not retrieve or create a cluster.", e);
} finally {
try {
if (clusterClient != null) {
clusterClient.close();
}
} catch (Exception e) {
// ignore
}
}
} catch (SqlExecutionException e) {
throw e;
} catch (Exception e) {
throw new SqlExecutionException("Could not locate a cluster.", e);
}
}
{code}
> SQL client can't cancel flink job
> ---------------------------------
>
> Key: FLINK-15669
> URL: https://issues.apache.org/jira/browse/FLINK-15669
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Client
> Affects Versions: 1.10.0
> Reporter: godfrey he
> Priority: Major
> Fix For: 1.10.0
>
>
> in sql client, CLI client do cancel query through {{void cancelQuery(String sessionId, String resultId)}} method in {{Executor}}. However, the {{resultId}} is a random UUID, is not the job id. So CLI client can't cancel a running job.
> related code in {{LocalExecutor}}:
> {code:java}
> private <C> ResultDescriptor executeQueryInternal(String sessionId, ExecutionContext<C> context, String query) {
> ......
> // store the result with a unique id
> final String resultId = UUID.randomUUID().toString();
> resultStore.storeResult(resultId, result);
> ......
> // create execution
> final ProgramDeployer deployer = new ProgramDeployer(
> configuration, jobName, pipeline);
> // start result retrieval
> result.startRetrieval(deployer);
> return new ResultDescriptor(
> resultId,
> removeTimeAttributes(table.getSchema()),
> result.isMaterialized());
> }
> private <T> void cancelQueryInternal(ExecutionContext<T> context, String resultId) {
> ......
> // stop Flink job
> try (final ClusterDescriptor<T> clusterDescriptor = context.createClusterDescriptor()) {
> ClusterClient<T> clusterClient = null;
> try {
> // retrieve existing cluster
> clusterClient = clusterDescriptor.retrieve(context.getClusterId()).getClusterClient();
> try {
> clusterClient.cancel(new JobID(StringUtils.hexStringToByte(resultId))).get();
> } catch (Throwable t) {
> // the job might has finished earlier
> }
> } catch (Exception e) {
> throw new SqlExecutionException("Could not retrieve or create a cluster.", e);
> } finally {
> try {
> if (clusterClient != null) {
> clusterClient.close();
> }
> } catch (Exception e) {
> // ignore
> }
> }
> } catch (SqlExecutionException e) {
> throw e;
> } catch (Exception e) {
> throw new SqlExecutionException("Could not locate a cluster.", e);
> }
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)