You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/04/25 15:45:21 UTC

flink git commit: [FLINK-8686] [sql-client] Improve basic embedded SQL client

Repository: flink
Updated Branches:
  refs/heads/master 063aeb17d -> fdfce98ab


[FLINK-8686] [sql-client] Improve basic embedded SQL client

- Fix invalid cached session properties
- Sort given table source properties
- Add logging for exceptions and cluster communication
- Add valid page range
- Fix highlighting during result refresh

This closes #5867.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fdfce98a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fdfce98a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fdfce98a

Branch: refs/heads/master
Commit: fdfce98ab741cb2591407c425c75cd0a283401a6
Parents: 063aeb1
Author: Timo Walther <tw...@apache.org>
Authored: Mon Apr 16 13:40:53 2018 +0200
Committer: Timo Walther <tw...@apache.org>
Committed: Wed Apr 25 17:43:50 2018 +0200

----------------------------------------------------------------------
 .../apache/flink/table/client/SqlClient.java    |  8 ++++++++
 .../flink/table/client/cli/CliClient.java       |  5 +++++
 .../table/client/cli/CliTableResultView.java    | 14 ++++++++++++--
 .../flink/table/client/config/Environment.java  | 18 ++++++++++++++++++
 .../table/client/gateway/SessionContext.java    |  6 ++++++
 .../client/gateway/local/ExecutionContext.java  |  2 +-
 .../client/gateway/local/LocalExecutor.java     | 20 +++++++++++++++++++-
 .../gateway/local/LocalExecutorITCase.java      |  4 ++++
 .../org/apache/flink/table/api/exceptions.scala |  4 ++--
 .../descriptors/DescriptorProperties.scala      |  7 +++++++
 10 files changed, 82 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fdfce98a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java
index 82ef4dd..65d986c 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java
@@ -26,6 +26,9 @@ import org.apache.flink.table.client.gateway.Executor;
 import org.apache.flink.table.client.gateway.SessionContext;
 import org.apache.flink.table.client.gateway.local.LocalExecutor;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.net.URL;
 import java.util.Arrays;
@@ -49,6 +52,8 @@ import java.util.List;
  */
 public class SqlClient {
 
+	private static final Logger LOG = LoggerFactory.getLogger(SqlClient.class);
+
 	private final boolean isEmbedded;
 	private final CliOptions options;
 
@@ -117,6 +122,7 @@ public class SqlClient {
 		}
 
 		System.out.println("Reading session environment from: " + envUrl);
+		LOG.info("Using session environment file: {}", envUrl);
 		try {
 			return Environment.parse(envUrl);
 		} catch (IOException e) {
@@ -148,11 +154,13 @@ public class SqlClient {
 						// make space in terminal
 						System.out.println();
 						System.out.println();
+						LOG.error("SQL Client must stop.", e);
 						throw e;
 					} catch (Throwable t) {
 						// make space in terminal
 						System.out.println();
 						System.out.println();
+						LOG.error("SQL Client must stop. Unexpected exception. This is a bug. Please consider filing an issue.", t);
 						throw new SqlClientException("Unexpected exception. This is a bug. Please consider filing an issue.", t);
 					}
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/fdfce98a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
index 87a7b8b..233e49b 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
@@ -39,6 +39,8 @@ import org.jline.utils.AttributedString;
 import org.jline.utils.AttributedStringBuilder;
 import org.jline.utils.AttributedStyle;
 import org.jline.utils.InfoCmp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOError;
 import java.io.IOException;
@@ -54,6 +56,8 @@ import java.util.Map;
  */
 public class CliClient {
 
+	private static final Logger LOG = LoggerFactory.getLogger(CliClient.class);
+
 	private final Executor executor;
 
 	private final SessionContext context;
@@ -367,6 +371,7 @@ public class CliClient {
 	// --------------------------------------------------------------------------------------------
 
 	private void printException(Throwable t) {
+		LOG.warn(CliStrings.MESSAGE_SQL_EXECUTION_ERROR, t);
 		terminal.writer().println(CliStrings.messageError(CliStrings.MESSAGE_SQL_EXECUTION_ERROR, t).toAnsi());
 		terminal.flush();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/fdfce98a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableResultView.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableResultView.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableResultView.java
index 4d9c69b..2c630d3 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableResultView.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableResultView.java
@@ -53,6 +53,7 @@ public class CliTableResultView extends CliResultView<CliTableResultView.ResultT
 	private int pageCount;
 	private int page;
 	private LocalTime lastRetrieval;
+	private int previousResultsPage;
 
 	private static final int DEFAULT_REFRESH_INTERVAL = 3; // every 1s
 	private static final int MIN_REFRESH_INTERVAL = 1; // every 100ms
@@ -66,6 +67,7 @@ public class CliTableResultView extends CliResultView<CliTableResultView.ResultT
 		page = LAST_PAGE;
 
 		previousResults = Collections.emptyList();
+		previousResultsPage = 1;
 		results = Collections.emptyList();
 	}
 
@@ -264,7 +266,15 @@ public class CliTableResultView extends CliResultView<CliTableResultView.ResultT
 				.collect(Collectors.toList());
 
 		// update results
-		previousResults = results;
+		if (previousResultsPage == retrievalPage) {
+			// only use the previous results if the current page number has not changed
+			// this allows for updated results when the key space remains constant
+			previousResults = results;
+		} else {
+			previousResults = null;
+			previousResultsPage = retrievalPage;
+		}
+
 		results = stringRows;
 
 		// check if selected row is still valid
@@ -301,7 +311,7 @@ public class CliTableResultView extends CliResultView<CliTableResultView.ResultT
 	private void gotoPage() {
 		final CliInputView view = new CliInputView(
 			client,
-			CliStrings.INPUT_ENTER_PAGE,
+			CliStrings.INPUT_ENTER_PAGE + " [1 to " + pageCount + "]",
 			(s) -> {
 				// validate input
 				final int newPage;

http://git-wip-us.apache.org/repos/asf/flink/blob/fdfce98a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
index 7169fe1..79efda8 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.client.config;
 
 import org.apache.flink.table.client.SqlClientException;
+import org.apache.flink.table.descriptors.DescriptorProperties;
 import org.apache.flink.table.descriptors.TableDescriptor;
 import org.apache.flink.table.descriptors.TableDescriptorValidator;
 
@@ -91,6 +92,23 @@ public class Environment {
 		return deployment;
 	}
 
+	@Override
+	public String toString() {
+		final StringBuilder sb = new StringBuilder();
+		sb.append("===================== Tables =====================\n");
+		tables.forEach((name, table) -> {
+			sb.append("- name: ").append(name).append("\n");
+			final DescriptorProperties props = new DescriptorProperties(true);
+			table.addProperties(props);
+			props.asMap().forEach((k, v) -> sb.append("  ").append(k).append(": ").append(v).append('\n'));
+		});
+		sb.append("=================== Execution ====================\n");
+		execution.toProperties().forEach((k, v) -> sb.append(k).append(": ").append(v).append('\n'));
+		sb.append("=================== Deployment ===================\n");
+		deployment.toProperties().forEach((k, v) -> sb.append(k).append(": ").append(v).append('\n'));
+		return sb.toString();
+	}
+
 	// --------------------------------------------------------------------------------------------
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/fdfce98a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SessionContext.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SessionContext.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SessionContext.java
index 0b6ee2c..6fa640f 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SessionContext.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SessionContext.java
@@ -56,6 +56,12 @@ public class SessionContext {
 		return Environment.enrich(defaultEnvironment, sessionProperties);
 	}
 
+	public SessionContext copy() {
+		final SessionContext session = new SessionContext(name, defaultEnvironment);
+		session.sessionProperties.putAll(sessionProperties);
+		return session;
+	}
+
 	@Override
 	public boolean equals(Object o) {
 		if (this == o) {

http://git-wip-us.apache.org/repos/asf/flink/blob/fdfce98a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index 84b7b28..2d54fc8 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -81,7 +81,7 @@ public class ExecutionContext<T> {
 
 	public ExecutionContext(Environment defaultEnvironment, SessionContext sessionContext, List<URL> dependencies,
 			Configuration flinkConfig, Options commandLineOptions, List<CustomCommandLine<?>> availableCommandLines) {
-		this.sessionContext = sessionContext;
+		this.sessionContext = sessionContext.copy(); // create internal copy because session context is mutable
 		this.mergedEnv = Environment.merge(defaultEnvironment, sessionContext.getEnvironment());
 		this.dependencies = dependencies;
 		this.flinkConfig = flinkConfig;

http://git-wip-us.apache.org/repos/asf/flink/blob/fdfce98a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index 30fa3c0..0329648 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -46,6 +46,8 @@ import org.apache.flink.types.Row;
 import org.apache.flink.util.StringUtils;
 
 import org.apache.commons.cli.Options;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
@@ -64,6 +66,8 @@ import java.util.Map;
  */
 public class LocalExecutor implements Executor {
 
+	private static final Logger LOG = LoggerFactory.getLogger(LocalExecutor.class);
+
 	private static final String DEFAULT_ENV_FILE = "sql-client-defaults.yaml";
 
 	// deployment
@@ -125,6 +129,7 @@ public class LocalExecutor implements Executor {
 				} catch (MalformedURLException e) {
 					throw new SqlClientException(e);
 				}
+				LOG.info("Using default environment file: {}", defaultEnv);
 			} else {
 				System.out.println("not found.");
 			}
@@ -284,6 +289,7 @@ public class LocalExecutor implements Executor {
 		}
 
 		// stop retrieval and remove the result
+		LOG.info("Cancelling job {} and result retrieval.", resultId);
 		result.close();
 		resultStore.removeResult(resultId);
 
@@ -347,7 +353,14 @@ public class LocalExecutor implements Executor {
 		resultStore.storeResult(resultId, result);
 
 		// create execution
-		final Runnable program = () -> deployJob(context, jobGraph, result);
+		final Runnable program = () -> {
+			LOG.info("Submitting job {} for query {}`", jobGraph.getJobID(), jobName);
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Submitting job {} with the following environment: \n{}",
+					jobGraph.getJobID(), context.getMergedEnvironment());
+			}
+			deployJob(context, jobGraph, result);
+		};
 
 		// start result retrieval
 		result.startRetrieval(program);
@@ -468,6 +481,11 @@ public class LocalExecutor implements Executor {
 		} catch (Exception e) {
 			throw new SqlClientException("Could not load all required JAR files.", e);
 		}
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Using the following dependencies: {}", dependencies);
+		}
+
 		return dependencies;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fdfce98a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index 378d475..48cc648 100644
--- a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -109,6 +109,10 @@ public class LocalExecutorITCase extends TestLogger {
 		final Executor executor = createDefaultExecutor(clusterClient);
 		final SessionContext session = new SessionContext("test-session", new Environment());
 
+		session.setSessionProperty("execution.result-mode", "changelog");
+
+		executor.getSessionProperties(session);
+
 		// modify defaults
 		session.setSessionProperty("execution.result-mode", "table");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fdfce98a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
index 952342a..cfc73e7 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
@@ -155,7 +155,7 @@ case class NoMatchingTableSourceException(
     extends RuntimeException(
       s"Could not find a table source factory in the classpath satisfying the " +
         s"following properties: \n" +
-        s"${properties.map(e => DescriptorProperties.toString(e._1, e._2)).mkString("\n")}",
+        s"${DescriptorProperties.toString(properties)}",
       cause) {
 
   def this(properties: Map[String, String]) = this(properties, null)
@@ -174,7 +174,7 @@ case class AmbiguousTableSourceException(
     extends RuntimeException(
       s"More than one table source factory in the classpath satisfying the " +
         s"following properties: \n" +
-        s"${properties.map(e => DescriptorProperties.toString(e._1, e._2)).mkString("\n")}",
+        s"${DescriptorProperties.toString(properties)}",
       cause) {
 
   def this(properties: Map[String, String]) = this(properties, null)

http://git-wip-us.apache.org/repos/asf/flink/blob/fdfce98a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
index 555d92d..4b0b60d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
@@ -1098,6 +1098,13 @@ object DescriptorProperties {
     toString(key) + "=" + toString(value)
   }
 
+  def toString(kv: Map[String, String]): String = {
+    kv.map(e => DescriptorProperties.toString(e._1, e._2))
+      .toSeq
+      .sorted
+      .mkString("\n")
+  }
+
   // the following methods help for Scala <-> Java interfaces
   // most of these methods are not necessary once we upgraded to Scala 2.12