You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/07/24 10:04:25 UTC

flink git commit: [FLINK-9765] [sql-client] Improve CLI responsiveness when cluster is not reachable

Repository: flink
Updated Branches:
  refs/heads/master b9a916afe -> 6022225a2


[FLINK-9765] [sql-client] Improve CLI responsiveness when cluster is not reachable

Moves the job cancellation into the final phase of the refresh thread in order to
keep the CLI responsive.

This closes #6265.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6022225a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6022225a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6022225a

Branch: refs/heads/master
Commit: 6022225a2b8ddbff7dc0bea9d5ecd55ce0031a9f
Parents: b9a916a
Author: Timo Walther <tw...@apache.org>
Authored: Thu Jul 5 13:25:28 2018 +0200
Committer: Timo Walther <tw...@apache.org>
Committed: Tue Jul 24 11:58:56 2018 +0200

----------------------------------------------------------------------
 .../flink/table/client/cli/CliResultView.java      | 17 +++++++++--------
 .../flink/table/client/gateway/Executor.java       |  3 ++-
 2 files changed, 11 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6022225a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliResultView.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliResultView.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliResultView.java
index 9f893bb..df42edd 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliResultView.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliResultView.java
@@ -217,15 +217,7 @@ public abstract class CliResultView<O extends Enum<O>> extends CliView<O, Void>
 
 	@Override
 	protected void cleanUp() {
-		// stop retrieval
 		stopRetrieval();
-
-		// cancel table program
-		try {
-			client.getExecutor().cancelQuery(client.getContext(), resultDescriptor.getResultId());
-		} catch (SqlExecutionException e) {
-			// ignore further exceptions
-		}
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -285,6 +277,15 @@ public abstract class CliResultView<O extends Enum<O>> extends CliView<O, Void>
 					display();
 				}
 			}
+
+			// cancel table program
+			try {
+				// the cancellation happens in the refresh thread in order to keep the main thread
+				// responsive at all times; esp. if the cluster is not available
+				client.getExecutor().cancelQuery(client.getContext(), resultDescriptor.getResultId());
+			} catch (SqlExecutionException e) {
+				// ignore further exceptions
+			}
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6022225a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
index 7f903a4..3a4dd81 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
@@ -83,7 +83,8 @@ public interface Executor {
 	List<Row> retrieveResultPage(String resultId, int page) throws SqlExecutionException;
 
 	/**
-	 * Cancels a table program and stops the result retrieval.
+	 * Cancels a table program and stops the result retrieval. Blocking until cancellation command has
+	 * been sent to cluster.
 	 */
 	void cancelQuery(SessionContext session, String resultId) throws SqlExecutionException;