You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "godfrey he (Jira)" <ji...@apache.org> on 2020/01/19 11:37:00 UTC

[jira] [Updated] (FLINK-15669) SQL client can't cancel flink job

     [ https://issues.apache.org/jira/browse/FLINK-15669?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

godfrey he updated FLINK-15669:
-------------------------------
    Description: 
in sql client, CLI client do cancel query through {{void cancelQuery(String sessionId, String resultId)}} method in {{Executor}}. However, the {{resultId}} is a random UUID, is not the job id. So CLI client can't cancel a running job.


related code in {{LocalExecutor}}:
{code:java}
	private <C> ResultDescriptor executeQueryInternal(String sessionId, ExecutionContext<C> context, String query) {
		......

		// store the result with a unique id
		final String resultId = UUID.randomUUID().toString();
		resultStore.storeResult(resultId, result);

	       ......

		// create execution
		final ProgramDeployer deployer = new ProgramDeployer(
				configuration, jobName, pipeline);

		// start result retrieval
		result.startRetrieval(deployer);

		return new ResultDescriptor(
				resultId,
				removeTimeAttributes(table.getSchema()),
				result.isMaterialized());
	}

private <T> void cancelQueryInternal(ExecutionContext<T> context, String resultId) {
		......

		// stop Flink job
		try (final ClusterDescriptor<T> clusterDescriptor = context.createClusterDescriptor()) {
			ClusterClient<T> clusterClient = null;
			try {
				// retrieve existing cluster
				clusterClient = clusterDescriptor.retrieve(context.getClusterId()).getClusterClient();
				try {
					clusterClient.cancel(new JobID(StringUtils.hexStringToByte(resultId))).get();
				} catch (Throwable t) {
					// the job might has finished earlier
				}
			} catch (Exception e) {
				throw new SqlExecutionException("Could not retrieve or create a cluster.", e);
			} finally {
				try {
					if (clusterClient != null) {
						clusterClient.close();
					}
				} catch (Exception e) {
					// ignore
				}
			}
		} catch (SqlExecutionException e) {
			throw e;
		} catch (Exception e) {
			throw new SqlExecutionException("Could not locate a cluster.", e);
		}
	}
{code}





  was:
in sql client, CLI client do cancel query through {{void cancelQuery(String sessionId, String resultId)}} method in {{Executor}}. However, the {{resultId}} is a random UUID, is not the job id. So CLI client can't cancel a running job.


{code:java}
	private <C> ResultDescriptor executeQueryInternal(String sessionId, ExecutionContext<C> context, String query) {
		......

		// store the result with a unique id
		final String resultId = UUID.randomUUID().toString();
		resultStore.storeResult(resultId, result);

	       ......

		// create execution
		final ProgramDeployer deployer = new ProgramDeployer(
				configuration, jobName, pipeline);

		// start result retrieval
		result.startRetrieval(deployer);

		return new ResultDescriptor(
				resultId,
				removeTimeAttributes(table.getSchema()),
				result.isMaterialized());
	}

private <T> void cancelQueryInternal(ExecutionContext<T> context, String resultId) {
		......

		// stop Flink job
		try (final ClusterDescriptor<T> clusterDescriptor = context.createClusterDescriptor()) {
			ClusterClient<T> clusterClient = null;
			try {
				// retrieve existing cluster
				clusterClient = clusterDescriptor.retrieve(context.getClusterId()).getClusterClient();
				try {
					clusterClient.cancel(new JobID(StringUtils.hexStringToByte(resultId))).get();
				} catch (Throwable t) {
					// the job might has finished earlier
				}
			} catch (Exception e) {
				throw new SqlExecutionException("Could not retrieve or create a cluster.", e);
			} finally {
				try {
					if (clusterClient != null) {
						clusterClient.close();
					}
				} catch (Exception e) {
					// ignore
				}
			}
		} catch (SqlExecutionException e) {
			throw e;
		} catch (Exception e) {
			throw new SqlExecutionException("Could not locate a cluster.", e);
		}
	}
{code}






> SQL client can't cancel flink job
> ---------------------------------
>
>                 Key: FLINK-15669
>                 URL: https://issues.apache.org/jira/browse/FLINK-15669
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Client
>    Affects Versions: 1.10.0
>            Reporter: godfrey he
>            Priority: Major
>             Fix For: 1.10.0
>
>
> in sql client, CLI client do cancel query through {{void cancelQuery(String sessionId, String resultId)}} method in {{Executor}}. However, the {{resultId}} is a random UUID, is not the job id. So CLI client can't cancel a running job.
> related code in {{LocalExecutor}}:
> {code:java}
> 	private <C> ResultDescriptor executeQueryInternal(String sessionId, ExecutionContext<C> context, String query) {
> 		......
> 		// store the result with a unique id
> 		final String resultId = UUID.randomUUID().toString();
> 		resultStore.storeResult(resultId, result);
> 	       ......
> 		// create execution
> 		final ProgramDeployer deployer = new ProgramDeployer(
> 				configuration, jobName, pipeline);
> 		// start result retrieval
> 		result.startRetrieval(deployer);
> 		return new ResultDescriptor(
> 				resultId,
> 				removeTimeAttributes(table.getSchema()),
> 				result.isMaterialized());
> 	}
> private <T> void cancelQueryInternal(ExecutionContext<T> context, String resultId) {
> 		......
> 		// stop Flink job
> 		try (final ClusterDescriptor<T> clusterDescriptor = context.createClusterDescriptor()) {
> 			ClusterClient<T> clusterClient = null;
> 			try {
> 				// retrieve existing cluster
> 				clusterClient = clusterDescriptor.retrieve(context.getClusterId()).getClusterClient();
> 				try {
> 					clusterClient.cancel(new JobID(StringUtils.hexStringToByte(resultId))).get();
> 				} catch (Throwable t) {
> 					// the job might has finished earlier
> 				}
> 			} catch (Exception e) {
> 				throw new SqlExecutionException("Could not retrieve or create a cluster.", e);
> 			} finally {
> 				try {
> 					if (clusterClient != null) {
> 						clusterClient.close();
> 					}
> 				} catch (Exception e) {
> 					// ignore
> 				}
> 			}
> 		} catch (SqlExecutionException e) {
> 			throw e;
> 		} catch (Exception e) {
> 			throw new SqlExecutionException("Could not locate a cluster.", e);
> 		}
> 	}
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)