You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/04/25 15:45:21 UTC
flink git commit: [FLINK-8686] [sql-client] Improve basic embedded
SQL client
Repository: flink
Updated Branches:
refs/heads/master 063aeb17d -> fdfce98ab
[FLINK-8686] [sql-client] Improve basic embedded SQL client
- Fix invalid cached session properties
- Sort given table source properties
- Add logging for exceptions and cluster communication
- Add valid page range
- Fix highlighting during result refresh
This closes #5867.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fdfce98a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fdfce98a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fdfce98a
Branch: refs/heads/master
Commit: fdfce98ab741cb2591407c425c75cd0a283401a6
Parents: 063aeb1
Author: Timo Walther <tw...@apache.org>
Authored: Mon Apr 16 13:40:53 2018 +0200
Committer: Timo Walther <tw...@apache.org>
Committed: Wed Apr 25 17:43:50 2018 +0200
----------------------------------------------------------------------
.../apache/flink/table/client/SqlClient.java | 8 ++++++++
.../flink/table/client/cli/CliClient.java | 5 +++++
.../table/client/cli/CliTableResultView.java | 14 ++++++++++++--
.../flink/table/client/config/Environment.java | 18 ++++++++++++++++++
.../table/client/gateway/SessionContext.java | 6 ++++++
.../client/gateway/local/ExecutionContext.java | 2 +-
.../client/gateway/local/LocalExecutor.java | 20 +++++++++++++++++++-
.../gateway/local/LocalExecutorITCase.java | 4 ++++
.../org/apache/flink/table/api/exceptions.scala | 4 ++--
.../descriptors/DescriptorProperties.scala | 7 +++++++
10 files changed, 82 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fdfce98a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java
index 82ef4dd..65d986c 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java
@@ -26,6 +26,9 @@ import org.apache.flink.table.client.gateway.Executor;
import org.apache.flink.table.client.gateway.SessionContext;
import org.apache.flink.table.client.gateway.local.LocalExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.IOException;
import java.net.URL;
import java.util.Arrays;
@@ -49,6 +52,8 @@ import java.util.List;
*/
public class SqlClient {
+ private static final Logger LOG = LoggerFactory.getLogger(SqlClient.class);
+
private final boolean isEmbedded;
private final CliOptions options;
@@ -117,6 +122,7 @@ public class SqlClient {
}
System.out.println("Reading session environment from: " + envUrl);
+ LOG.info("Using session environment file: {}", envUrl);
try {
return Environment.parse(envUrl);
} catch (IOException e) {
@@ -148,11 +154,13 @@ public class SqlClient {
// make space in terminal
System.out.println();
System.out.println();
+ LOG.error("SQL Client must stop.", e);
throw e;
} catch (Throwable t) {
// make space in terminal
System.out.println();
System.out.println();
+ LOG.error("SQL Client must stop. Unexpected exception. This is a bug. Please consider filing an issue.", t);
throw new SqlClientException("Unexpected exception. This is a bug. Please consider filing an issue.", t);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fdfce98a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
index 87a7b8b..233e49b 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
@@ -39,6 +39,8 @@ import org.jline.utils.AttributedString;
import org.jline.utils.AttributedStringBuilder;
import org.jline.utils.AttributedStyle;
import org.jline.utils.InfoCmp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOError;
import java.io.IOException;
@@ -54,6 +56,8 @@ import java.util.Map;
*/
public class CliClient {
+ private static final Logger LOG = LoggerFactory.getLogger(CliClient.class);
+
private final Executor executor;
private final SessionContext context;
@@ -367,6 +371,7 @@ public class CliClient {
// --------------------------------------------------------------------------------------------
private void printException(Throwable t) {
+ LOG.warn(CliStrings.MESSAGE_SQL_EXECUTION_ERROR, t);
terminal.writer().println(CliStrings.messageError(CliStrings.MESSAGE_SQL_EXECUTION_ERROR, t).toAnsi());
terminal.flush();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fdfce98a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableResultView.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableResultView.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableResultView.java
index 4d9c69b..2c630d3 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableResultView.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableResultView.java
@@ -53,6 +53,7 @@ public class CliTableResultView extends CliResultView<CliTableResultView.ResultT
private int pageCount;
private int page;
private LocalTime lastRetrieval;
+ private int previousResultsPage;
private static final int DEFAULT_REFRESH_INTERVAL = 3; // every 1s
private static final int MIN_REFRESH_INTERVAL = 1; // every 100ms
@@ -66,6 +67,7 @@ public class CliTableResultView extends CliResultView<CliTableResultView.ResultT
page = LAST_PAGE;
previousResults = Collections.emptyList();
+ previousResultsPage = 1;
results = Collections.emptyList();
}
@@ -264,7 +266,15 @@ public class CliTableResultView extends CliResultView<CliTableResultView.ResultT
.collect(Collectors.toList());
// update results
- previousResults = results;
+ if (previousResultsPage == retrievalPage) {
+ // only use the previous results if the current page number has not changed
+ // this allows for updated results when the key space remains constant
+ previousResults = results;
+ } else {
+ previousResults = null;
+ previousResultsPage = retrievalPage;
+ }
+
results = stringRows;
// check if selected row is still valid
@@ -301,7 +311,7 @@ public class CliTableResultView extends CliResultView<CliTableResultView.ResultT
private void gotoPage() {
final CliInputView view = new CliInputView(
client,
- CliStrings.INPUT_ENTER_PAGE,
+ CliStrings.INPUT_ENTER_PAGE + " [1 to " + pageCount + "]",
(s) -> {
// validate input
final int newPage;
http://git-wip-us.apache.org/repos/asf/flink/blob/fdfce98a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
index 7169fe1..79efda8 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.client.config;
import org.apache.flink.table.client.SqlClientException;
+import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.descriptors.TableDescriptor;
import org.apache.flink.table.descriptors.TableDescriptorValidator;
@@ -91,6 +92,23 @@ public class Environment {
return deployment;
}
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder();
+ sb.append("===================== Tables =====================\n");
+ tables.forEach((name, table) -> {
+ sb.append("- name: ").append(name).append("\n");
+ final DescriptorProperties props = new DescriptorProperties(true);
+ table.addProperties(props);
+ props.asMap().forEach((k, v) -> sb.append(" ").append(k).append(": ").append(v).append('\n'));
+ });
+ sb.append("=================== Execution ====================\n");
+ execution.toProperties().forEach((k, v) -> sb.append(k).append(": ").append(v).append('\n'));
+ sb.append("=================== Deployment ===================\n");
+ deployment.toProperties().forEach((k, v) -> sb.append(k).append(": ").append(v).append('\n'));
+ return sb.toString();
+ }
+
// --------------------------------------------------------------------------------------------
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/fdfce98a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SessionContext.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SessionContext.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SessionContext.java
index 0b6ee2c..6fa640f 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SessionContext.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SessionContext.java
@@ -56,6 +56,12 @@ public class SessionContext {
return Environment.enrich(defaultEnvironment, sessionProperties);
}
+ public SessionContext copy() {
+ final SessionContext session = new SessionContext(name, defaultEnvironment);
+ session.sessionProperties.putAll(sessionProperties);
+ return session;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
http://git-wip-us.apache.org/repos/asf/flink/blob/fdfce98a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index 84b7b28..2d54fc8 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -81,7 +81,7 @@ public class ExecutionContext<T> {
public ExecutionContext(Environment defaultEnvironment, SessionContext sessionContext, List<URL> dependencies,
Configuration flinkConfig, Options commandLineOptions, List<CustomCommandLine<?>> availableCommandLines) {
- this.sessionContext = sessionContext;
+ this.sessionContext = sessionContext.copy(); // create internal copy because session context is mutable
this.mergedEnv = Environment.merge(defaultEnvironment, sessionContext.getEnvironment());
this.dependencies = dependencies;
this.flinkConfig = flinkConfig;
http://git-wip-us.apache.org/repos/asf/flink/blob/fdfce98a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index 30fa3c0..0329648 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -46,6 +46,8 @@ import org.apache.flink.types.Row;
import org.apache.flink.util.StringUtils;
import org.apache.commons.cli.Options;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
@@ -64,6 +66,8 @@ import java.util.Map;
*/
public class LocalExecutor implements Executor {
+ private static final Logger LOG = LoggerFactory.getLogger(LocalExecutor.class);
+
private static final String DEFAULT_ENV_FILE = "sql-client-defaults.yaml";
// deployment
@@ -125,6 +129,7 @@ public class LocalExecutor implements Executor {
} catch (MalformedURLException e) {
throw new SqlClientException(e);
}
+ LOG.info("Using default environment file: {}", defaultEnv);
} else {
System.out.println("not found.");
}
@@ -284,6 +289,7 @@ public class LocalExecutor implements Executor {
}
// stop retrieval and remove the result
+ LOG.info("Cancelling job {} and result retrieval.", resultId);
result.close();
resultStore.removeResult(resultId);
@@ -347,7 +353,14 @@ public class LocalExecutor implements Executor {
resultStore.storeResult(resultId, result);
// create execution
- final Runnable program = () -> deployJob(context, jobGraph, result);
+ final Runnable program = () -> {
+ LOG.info("Submitting job {} for query {}`", jobGraph.getJobID(), jobName);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Submitting job {} with the following environment: \n{}",
+ jobGraph.getJobID(), context.getMergedEnvironment());
+ }
+ deployJob(context, jobGraph, result);
+ };
// start result retrieval
result.startRetrieval(program);
@@ -468,6 +481,11 @@ public class LocalExecutor implements Executor {
} catch (Exception e) {
throw new SqlClientException("Could not load all required JAR files.", e);
}
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Using the following dependencies: {}", dependencies);
+ }
+
return dependencies;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fdfce98a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index 378d475..48cc648 100644
--- a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -109,6 +109,10 @@ public class LocalExecutorITCase extends TestLogger {
final Executor executor = createDefaultExecutor(clusterClient);
final SessionContext session = new SessionContext("test-session", new Environment());
+ session.setSessionProperty("execution.result-mode", "changelog");
+
+ executor.getSessionProperties(session);
+
// modify defaults
session.setSessionProperty("execution.result-mode", "table");
http://git-wip-us.apache.org/repos/asf/flink/blob/fdfce98a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
index 952342a..cfc73e7 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
@@ -155,7 +155,7 @@ case class NoMatchingTableSourceException(
extends RuntimeException(
s"Could not find a table source factory in the classpath satisfying the " +
s"following properties: \n" +
- s"${properties.map(e => DescriptorProperties.toString(e._1, e._2)).mkString("\n")}",
+ s"${DescriptorProperties.toString(properties)}",
cause) {
def this(properties: Map[String, String]) = this(properties, null)
@@ -174,7 +174,7 @@ case class AmbiguousTableSourceException(
extends RuntimeException(
s"More than one table source factory in the classpath satisfying the " +
s"following properties: \n" +
- s"${properties.map(e => DescriptorProperties.toString(e._1, e._2)).mkString("\n")}",
+ s"${DescriptorProperties.toString(properties)}",
cause) {
def this(properties: Map[String, String]) = this(properties, null)
http://git-wip-us.apache.org/repos/asf/flink/blob/fdfce98a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
index 555d92d..4b0b60d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
@@ -1098,6 +1098,13 @@ object DescriptorProperties {
toString(key) + "=" + toString(value)
}
+ def toString(kv: Map[String, String]): String = {
+ kv.map(e => DescriptorProperties.toString(e._1, e._2))
+ .toSeq
+ .sorted
+ .mkString("\n")
+ }
+
// the following methods help for Scala <-> Java interfaces
// most of these methods are not necessary once we upgraded to Scala 2.12