You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/07/15 19:49:14 UTC
[2/2] flink git commit: [FLINK-8858] [sql-client] Add support for
INSERT INTO in SQL Client
[FLINK-8858] [sql-client] Add support for INSERT INTO in SQL Client
This closes #6332.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/695bc56a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/695bc56a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/695bc56a
Branch: refs/heads/master
Commit: 695bc56a9e20b9d86eea14a02899b400d324a7ea
Parents: a7be2e1
Author: Timo Walther <tw...@apache.org>
Authored: Thu Jul 5 18:11:18 2018 +0200
Committer: Timo Walther <tw...@apache.org>
Committed: Sun Jul 15 21:47:31 2018 +0200
----------------------------------------------------------------------
flink-libraries/flink-sql-client/pom.xml | 6 +-
.../apache/flink/table/client/SqlClient.java | 26 ++-
.../flink/table/client/cli/CliClient.java | 146 +++++++++++-----
.../flink/table/client/cli/CliOptions.java | 15 +-
.../table/client/cli/CliOptionsParser.java | 25 ++-
.../flink/table/client/cli/CliStrings.java | 7 +
.../table/client/cli/SqlCommandParser.java | 11 +-
.../flink/table/client/gateway/Executor.java | 9 +
.../client/gateway/ProgramTargetDescriptor.java | 80 +++++++++
.../local/ChangelogCollectStreamResult.java | 99 -----------
.../client/gateway/local/ChangelogResult.java | 38 ----
.../gateway/local/CollectStreamResult.java | 175 -------------------
.../client/gateway/local/DynamicResult.java | 65 -------
.../client/gateway/local/ExecutionContext.java | 8 +
.../client/gateway/local/LocalExecutor.java | 70 +++++++-
.../local/MaterializedCollectBatchResult.java | 159 -----------------
.../local/MaterializedCollectStreamResult.java | 135 --------------
.../gateway/local/MaterializedResult.java | 43 -----
.../client/gateway/local/ProgramDeployer.java | 122 +++++++++----
.../table/client/gateway/local/ResultStore.java | 4 +
.../gateway/local/result/BasicResult.java | 53 ++++++
.../result/ChangelogCollectStreamResult.java | 99 +++++++++++
.../gateway/local/result/ChangelogResult.java | 38 ++++
.../local/result/CollectStreamResult.java | 168 ++++++++++++++++++
.../gateway/local/result/DynamicResult.java | 61 +++++++
.../result/MaterializedCollectBatchResult.java | 152 ++++++++++++++++
.../result/MaterializedCollectStreamResult.java | 135 ++++++++++++++
.../local/result/MaterializedResult.java | 43 +++++
.../client/gateway/local/result/Result.java | 33 ++++
.../src/test/assembly/test-table-factories.xml | 49 ++++++
.../test/assembly/test-table-source-factory.xml | 47 -----
.../flink/table/client/cli/CliClientTest.java | 161 +++++++++++++++++
.../client/gateway/local/DependencyTest.java | 14 +-
.../client/gateway/local/EnvironmentTest.java | 1 +
.../gateway/local/ExecutionContextTest.java | 51 ++++++
.../gateway/local/LocalExecutorITCase.java | 66 ++++++-
.../gateway/utils/TestTableSinkFactory.java | 126 +++++++++++++
.../gateway/utils/TestTableSourceFactory.java | 8 +-
.../test/resources/test-factory-services-file | 1 +
.../resources/test-sql-client-defaults.yaml | 17 ++
.../test/resources/test-sql-client-factory.yaml | 2 +-
.../plan/nodes/PhysicalTableSourceScan.scala | 3 +-
42 files changed, 1705 insertions(+), 866 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/695bc56a/flink-libraries/flink-sql-client/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/pom.xml b/flink-libraries/flink-sql-client/pom.xml
index a9fc00e..2c6e2ba 100644
--- a/flink-libraries/flink-sql-client/pom.xml
+++ b/flink-libraries/flink-sql-client/pom.xml
@@ -177,16 +177,16 @@ under the License.
<version>2.4</version>
<executions>
<execution>
- <id>create-table-source-factory-jar</id>
+ <id>create-table-factories-jar</id>
<phase>process-test-classes</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
- <finalName>table-source-factory</finalName>
+ <finalName>table-factories</finalName>
<attach>false</attach>
<descriptors>
- <descriptor>src/test/assembly/test-table-source-factory.xml</descriptor>
+ <descriptor>src/test/assembly/test-table-factories.xml</descriptor>
</descriptors>
</configuration>
</execution>
http://git-wip-us.apache.org/repos/asf/flink/blob/695bc56a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java
index 65d986c..3efaa6a 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java
@@ -97,14 +97,34 @@ public class SqlClient {
// add shutdown hook
Runtime.getRuntime().addShutdownHook(new EmbeddedShutdownThread(context, executor));
- // start CLI
- final CliClient cli = new CliClient(context, executor);
- cli.open();
+ // do the actual work
+ openCli(context, executor);
} else {
throw new SqlClientException("Gateway mode is not supported yet.");
}
}
+ /**
+ * Opens the CLI client for executing SQL statements.
+ *
+ * @param context session context
+ * @param executor executor
+ */
+ private void openCli(SessionContext context, Executor executor) {
+ final CliClient cli = new CliClient(context, executor);
+ // interactive CLI mode
+ if (options.getUpdateStatement() == null) {
+ cli.open();
+ }
+ // execute single update statement
+ else {
+ final boolean success = cli.submitUpdate(options.getUpdateStatement());
+ if (!success) {
+ throw new SqlClientException("Could not submit given SQL update statement to cluster.");
+ }
+ }
+ }
+
// --------------------------------------------------------------------------------------------
private static void shutdown(SessionContext context, Executor executor) {
http://git-wip-us.apache.org/repos/asf/flink/blob/695bc56a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
index 7bf7bb7..576f518 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
@@ -20,9 +20,9 @@ package org.apache.flink.table.client.cli;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.client.SqlClientException;
-import org.apache.flink.table.client.cli.SqlCommandParser.SqlCommand;
import org.apache.flink.table.client.cli.SqlCommandParser.SqlCommandCall;
import org.apache.flink.table.client.gateway.Executor;
+import org.apache.flink.table.client.gateway.ProgramTargetDescriptor;
import org.apache.flink.table.client.gateway.ResultDescriptor;
import org.apache.flink.table.client.gateway.SessionContext;
import org.apache.flink.table.client.gateway.SqlExecutionException;
@@ -50,6 +50,7 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
/**
* SQL CLI client.
@@ -85,6 +86,9 @@ public class CliClient {
terminal = TerminalBuilder.builder()
.name(CliStrings.CLI_NAME)
.build();
+ // make space from previous output and test the writer
+ terminal.writer().println();
+ terminal.writer().flush();
} catch (IOException e) {
throw new SqlClientException("Error opening command line interface.", e);
}
@@ -149,6 +153,9 @@ public class CliClient {
return executor;
}
+ /**
+ * Opens the interactive CLI shell.
+ */
public void open() {
isRunning = true;
@@ -173,55 +180,91 @@ public class CliClient {
if (line == null || line.equals("")) {
continue;
}
+ final Optional<SqlCommandCall> cmdCall = parseCommand(line);
+ cmdCall.ifPresent(this::callCommand);
+ }
+ }
- final SqlCommandCall cmdCall = SqlCommandParser.parse(line);
-
- if (cmdCall == null) {
- terminal.writer().println(CliStrings.messageError(CliStrings.MESSAGE_UNKNOWN_SQL));
- continue;
- }
+ /**
+ * Submits a SQL update statement and prints status information and/or errors on the terminal.
+ *
+ * @param statement SQL update statement
+ * @return flag to indicate if the submission was successful or not
+ */
+ public boolean submitUpdate(String statement) {
+ terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_WILL_EXECUTE).toAnsi());
+ terminal.writer().println(new AttributedString(statement).toString());
+ terminal.flush();
+ final Optional<SqlCommandCall> parsedStatement = parseCommand(statement);
+ // only support INSERT INTO
+ return parsedStatement.map(cmdCall -> {
switch (cmdCall.command) {
- case QUIT:
- case EXIT:
- callQuit(cmdCall);
- break;
- case CLEAR:
- callClear(cmdCall);
- break;
- case RESET:
- callReset(cmdCall);
- break;
- case SET:
- callSet(cmdCall);
- break;
- case HELP:
- callHelp(cmdCall);
- break;
- case SHOW_TABLES:
- callShowTables(cmdCall);
- break;
- case SHOW_FUNCTIONS:
- callShowFunctions(cmdCall);
- break;
- case DESCRIBE:
- callDescribe(cmdCall);
- break;
- case EXPLAIN:
- callExplain(cmdCall);
- break;
- case SELECT:
- callSelect(cmdCall);
- break;
- case SOURCE:
- callSource(cmdCall);
- break;
+ case INSERT_INTO:
+ return callInsertInto(cmdCall);
+ default:
+ terminal.writer().println(CliStrings.messageError(CliStrings.MESSAGE_UNSUPPORTED_SQL).toAnsi());
+ terminal.flush();
+ return false;
}
- }
+ }).orElse(false);
}
// --------------------------------------------------------------------------------------------
+ private Optional<SqlCommandCall> parseCommand(String line) {
+ final Optional<SqlCommandCall> parsedLine = SqlCommandParser.parse(line);
+ if (!parsedLine.isPresent()) {
+ terminal.writer().println(CliStrings.messageError(CliStrings.MESSAGE_UNKNOWN_SQL).toAnsi());
+ terminal.flush();
+ }
+ return parsedLine;
+ }
+
+ private void callCommand(SqlCommandCall cmdCall) {
+ switch (cmdCall.command) {
+ case QUIT:
+ case EXIT:
+ callQuit(cmdCall);
+ break;
+ case CLEAR:
+ callClear(cmdCall);
+ break;
+ case RESET:
+ callReset(cmdCall);
+ break;
+ case SET:
+ callSet(cmdCall);
+ break;
+ case HELP:
+ callHelp(cmdCall);
+ break;
+ case SHOW_TABLES:
+ callShowTables(cmdCall);
+ break;
+ case SHOW_FUNCTIONS:
+ callShowFunctions(cmdCall);
+ break;
+ case DESCRIBE:
+ callDescribe(cmdCall);
+ break;
+ case EXPLAIN:
+ callExplain(cmdCall);
+ break;
+ case SELECT:
+ callSelect(cmdCall);
+ break;
+ case INSERT_INTO:
+ callInsertInto(cmdCall);
+ break;
+ case SOURCE:
+ callSource(cmdCall);
+ break;
+ default:
+ throw new SqlClientException("Unsupported command: " + cmdCall.command);
+ }
+ }
+
private void callQuit(SqlCommandCall cmdCall) {
terminal.writer().println(CliStrings.MESSAGE_QUIT);
terminal.flush();
@@ -354,6 +397,22 @@ public class CliClient {
}
}
+ private boolean callInsertInto(SqlCommandCall cmdCall) {
+ terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_SUBMITTING_STATEMENT).toAnsi());
+ terminal.flush();
+
+ try {
+ final ProgramTargetDescriptor programTarget = executor.executeUpdate(context, cmdCall.operands[0]);
+ terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_STATEMENT_SUBMITTED).toAnsi());
+ terminal.writer().println(programTarget.toString());
+ terminal.flush();
+ } catch (SqlExecutionException e) {
+ printException(e);
+ return false;
+ }
+ return true;
+ }
+
private void callSource(SqlCommandCall cmdCall) {
final String pathString = cmdCall.operands[0];
@@ -384,7 +443,8 @@ public class CliClient {
terminal.flush();
// try to run it
- callSelect(new SqlCommandCall(SqlCommand.SELECT, new String[] { stmt }));
+ final Optional<SqlCommandCall> call = parseCommand(stmt);
+ call.ifPresent(this::callCommand);
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/695bc56a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptions.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptions.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptions.java
index eddcc77..06a063a 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptions.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptions.java
@@ -33,14 +33,23 @@ public class CliOptions {
private final URL defaults;
private final List<URL> jars;
private final List<URL> libraryDirs;
+ private final String updateStatement;
- public CliOptions(boolean isPrintHelp, String sessionId, URL environment, URL defaults, List<URL> jars, List<URL> libraryDirs) {
+ public CliOptions(
+ boolean isPrintHelp,
+ String sessionId,
+ URL environment,
+ URL defaults,
+ List<URL> jars,
+ List<URL> libraryDirs,
+ String updateStatement) {
this.isPrintHelp = isPrintHelp;
this.sessionId = sessionId;
this.environment = environment;
this.defaults = defaults;
this.jars = jars;
this.libraryDirs = libraryDirs;
+ this.updateStatement = updateStatement;
}
public boolean isPrintHelp() {
@@ -66,4 +75,8 @@ public class CliOptions {
public List<URL> getLibraryDirs() {
return libraryDirs;
}
+
+ public String getUpdateStatement() {
+ return updateStatement;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/695bc56a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java
index 77119ef..b924779 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java
@@ -103,6 +103,20 @@ public class CliOptionsParser {
"functions, table sources, or sinks. Can be used multiple times.")
.build();
+ public static final Option OPTION_UPDATE = Option
+ .builder("u")
+ .required(false)
+ .longOpt("update")
+ .numberOfArgs(1)
+ .argName("SQL update statement")
+ .desc(
+ "Experimental (for testing only!): Instructs the SQL Client to immediately execute " +
+ "the given update statement after starting up. The process is shut down after the " +
+ "statement has been submitted to the cluster and returns an appropriate return code. " +
+ "Currently, this feature is only supported for INSERT INTO statements that declare " +
+ "the target sink table.")
+ .build();
+
private static final Options EMBEDDED_MODE_CLIENT_OPTIONS = getEmbeddedModeClientOptions(new Options());
private static final Options GATEWAY_MODE_CLIENT_OPTIONS = getGatewayModeClientOptions(new Options());
private static final Options GATEWAY_MODE_GATEWAY_OPTIONS = getGatewayModeGatewayOptions(new Options());
@@ -118,6 +132,7 @@ public class CliOptionsParser {
options.addOption(OPTION_DEFAULTS);
options.addOption(OPTION_JAR);
options.addOption(OPTION_LIBRARY);
+ options.addOption(OPTION_UPDATE);
return options;
}
@@ -125,6 +140,7 @@ public class CliOptionsParser {
buildGeneralOptions(options);
options.addOption(OPTION_SESSION);
options.addOption(OPTION_ENVIRONMENT);
+ options.addOption(OPTION_UPDATE);
return options;
}
@@ -218,7 +234,8 @@ public class CliOptionsParser {
checkUrl(line, CliOptionsParser.OPTION_ENVIRONMENT),
checkUrl(line, CliOptionsParser.OPTION_DEFAULTS),
checkUrls(line, CliOptionsParser.OPTION_JAR),
- checkUrls(line, CliOptionsParser.OPTION_LIBRARY)
+ checkUrls(line, CliOptionsParser.OPTION_LIBRARY),
+ line.getOptionValue(CliOptionsParser.OPTION_UPDATE.getOpt())
);
}
catch (ParseException e) {
@@ -236,7 +253,8 @@ public class CliOptionsParser {
checkUrl(line, CliOptionsParser.OPTION_ENVIRONMENT),
null,
checkUrls(line, CliOptionsParser.OPTION_JAR),
- checkUrls(line, CliOptionsParser.OPTION_LIBRARY)
+ checkUrls(line, CliOptionsParser.OPTION_LIBRARY),
+ line.getOptionValue(CliOptionsParser.OPTION_UPDATE.getOpt())
);
}
catch (ParseException e) {
@@ -254,7 +272,8 @@ public class CliOptionsParser {
null,
checkUrl(line, CliOptionsParser.OPTION_DEFAULTS),
checkUrls(line, CliOptionsParser.OPTION_JAR),
- checkUrls(line, CliOptionsParser.OPTION_LIBRARY)
+ checkUrls(line, CliOptionsParser.OPTION_LIBRARY),
+ null
);
}
catch (ParseException e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/695bc56a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
index b917317..aef7669 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
@@ -49,6 +49,7 @@ public final class CliStrings {
.append(formatCommand(SqlCommand.DESCRIBE, "Describes the schema of a table with the given name."))
.append(formatCommand(SqlCommand.EXPLAIN, "Describes the execution plan of a query or table with the given name."))
.append(formatCommand(SqlCommand.SELECT, "Executes a SQL SELECT query on the Flink cluster."))
+ .append(formatCommand(SqlCommand.INSERT_INTO, "Inserts the results of a SQL SELECT query into a declared table sink."))
.append(formatCommand(SqlCommand.SOURCE, "Reads a SQL SELECT query from a file and executes it on the Flink cluster."))
.append(formatCommand(SqlCommand.SET, "Sets a session configuration property. Syntax: 'SET <key>=<value>'. Use 'SET' for listing all properties."))
.append(formatCommand(SqlCommand.RESET, "Resets all session configuration properties."))
@@ -122,12 +123,18 @@ public final class CliStrings {
public static final String MESSAGE_RESULT_QUIT = "Result retrieval cancelled.";
+ public static final String MESSAGE_SUBMITTING_STATEMENT = "Submitting SQL update statement to the cluster...";
+
+ public static final String MESSAGE_STATEMENT_SUBMITTED = "Table update statement has been successfully submitted to the cluster:";
+
public static final String MESSAGE_INVALID_PATH = "Path is invalid.";
public static final String MESSAGE_MAX_SIZE_EXCEEDED = "The given file exceeds the maximum number of characters.";
public static final String MESSAGE_WILL_EXECUTE = "Executing the following statement:";
+ public static final String MESSAGE_UNSUPPORTED_SQL = "Unsupported SQL statement.";
+
// --------------------------------------------------------------------------------------------
public static final String RESULT_TITLE = "SQL Query Result";
http://git-wip-us.apache.org/repos/asf/flink/blob/695bc56a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
index 5543a3e..376b1f1 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
@@ -18,6 +18,8 @@
package org.apache.flink.table.client.cli;
+import java.util.Optional;
+
/**
* Simple parser for determining the type of command and its parameters.
*/
@@ -27,7 +29,7 @@ public final class SqlCommandParser {
// private
}
- public static SqlCommandCall parse(String stmt) {
+ public static Optional<SqlCommandCall> parse(String stmt) {
String trimmed = stmt.trim();
// remove ';' at the end because many people type it intuitively
if (trimmed.endsWith(";")) {
@@ -43,10 +45,11 @@ public final class SqlCommandParser {
// match
if (tokenCount < cmd.tokens.length && token.equalsIgnoreCase(cmd.tokens[tokenCount])) {
if (tokenCount == cmd.tokens.length - 1) {
- return new SqlCommandCall(
+ final SqlCommandCall call = new SqlCommandCall(
cmd,
splitOperands(cmd, trimmed, trimmed.substring(Math.min(pos, trimmed.length())))
);
+ return Optional.of(call);
}
} else {
// next sql command
@@ -56,7 +59,7 @@ public final class SqlCommandParser {
}
}
}
- return null;
+ return Optional.empty();
}
private static String[] splitOperands(SqlCommand cmd, String originalCall, String operands) {
@@ -69,6 +72,7 @@ public final class SqlCommandParser {
return new String[] {operands.substring(0, delimiter), operands.substring(delimiter + 1)};
}
case SELECT:
+ case INSERT_INTO:
return new String[] {originalCall};
default:
return new String[] {operands};
@@ -90,6 +94,7 @@ public final class SqlCommandParser {
DESCRIBE("describe"),
EXPLAIN("explain"),
SELECT("select"),
+ INSERT_INTO("insert into"),
SET("set"),
RESET("reset"),
SOURCE("source");
http://git-wip-us.apache.org/repos/asf/flink/blob/695bc56a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
index 9ace240..7f903a4 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
@@ -88,6 +88,15 @@ public interface Executor {
void cancelQuery(SessionContext session, String resultId) throws SqlExecutionException;
/**
+ * Submits a Flink SQL update statement such as INSERT INTO.
+ *
+ * @param session context in with the statement is executed
+ * @param statement SQL update statement (currently only INSERT INTO is supported)
+ * @return information about the target of the submitted Flink job
+ */
+ ProgramTargetDescriptor executeUpdate(SessionContext session, String statement) throws SqlExecutionException;
+
+ /**
* Stops the executor.
*/
void stop(SessionContext session);
http://git-wip-us.apache.org/repos/asf/flink/blob/695bc56a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ProgramTargetDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ProgramTargetDescriptor.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ProgramTargetDescriptor.java
new file mode 100644
index 0000000..7f6b3d3
--- /dev/null
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ProgramTargetDescriptor.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway;
+
+import org.apache.flink.api.common.JobID;
+
+/**
+ * Describes the target where a table program has been submitted to.
+ */
+public class ProgramTargetDescriptor {
+
+ private final String clusterId;
+
+ private final String jobId;
+
+ private final String webInterfaceUrl;
+
+ public ProgramTargetDescriptor(String clusterId, String jobId, String webInterfaceUrl) {
+ this.clusterId = clusterId;
+ this.jobId = jobId;
+ this.webInterfaceUrl = webInterfaceUrl;
+ }
+
+ public String getClusterId() {
+ return clusterId;
+ }
+
+ public String getJobId() {
+ return jobId;
+ }
+
+ public String getWebInterfaceUrl() {
+ return webInterfaceUrl;
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "Cluster ID: %s\n" +
+ "Job ID: %s\n" +
+ "Web interface: %s",
+ clusterId, jobId, webInterfaceUrl);
+ }
+
+ /**
+ * Creates a program target description from deployment classes.
+ *
+ * @param clusterId cluster id
+ * @param jobId job id
+ * @param <C> cluster id type
+ * @return program target descriptor
+ */
+ public static <C> ProgramTargetDescriptor of(C clusterId, JobID jobId, String webInterfaceUrl) {
+ String clusterIdString;
+ try {
+ // check if cluster id has a toString method
+ clusterId.getClass().getDeclaredMethod("toString");
+ clusterIdString = clusterId.toString();
+ } catch (NoSuchMethodException e) {
+ clusterIdString = clusterId.getClass().getSimpleName();
+ }
+ return new ProgramTargetDescriptor(clusterIdString, jobId.toString(), webInterfaceUrl);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/695bc56a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ChangelogCollectStreamResult.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ChangelogCollectStreamResult.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ChangelogCollectStreamResult.java
deleted file mode 100644
index 2375584..0000000
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ChangelogCollectStreamResult.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.client.gateway.local;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.table.client.gateway.TypedResult;
-import org.apache.flink.types.Row;
-
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Collects results and returns them as a changelog.
- *
- * @param <C> cluster id to which this result belongs to
- */
-public class ChangelogCollectStreamResult<C> extends CollectStreamResult<C> implements ChangelogResult<C> {
-
- private List<Tuple2<Boolean, Row>> changeRecordBuffer;
- private static final int CHANGE_RECORD_BUFFER_SIZE = 5_000;
-
- public ChangelogCollectStreamResult(TypeInformation<Row> outputType, ExecutionConfig config,
- InetAddress gatewayAddress, int gatewayPort) {
- super(outputType, config, gatewayAddress, gatewayPort);
-
- // prepare for changelog
- changeRecordBuffer = new ArrayList<>();
- }
-
- @Override
- public boolean isMaterialized() {
- return false;
- }
-
- @Override
- public TypedResult<List<Tuple2<Boolean, Row>>> retrieveChanges() {
- synchronized (resultLock) {
- // retrieval thread is alive return a record if available
- // but the program must not have failed
- if (isRetrieving() && executionException == null) {
- if (changeRecordBuffer.isEmpty()) {
- return TypedResult.empty();
- } else {
- final List<Tuple2<Boolean, Row>> change = new ArrayList<>(changeRecordBuffer);
- changeRecordBuffer.clear();
- resultLock.notify();
- return TypedResult.payload(change);
- }
- }
- // retrieval thread is dead but there is still a record to be delivered
- else if (!isRetrieving() && !changeRecordBuffer.isEmpty()) {
- final List<Tuple2<Boolean, Row>> change = new ArrayList<>(changeRecordBuffer);
- changeRecordBuffer.clear();
- return TypedResult.payload(change);
- }
- // no results can be returned anymore
- else {
- return handleMissingResult();
- }
- }
- }
-
- // --------------------------------------------------------------------------------------------
-
- @Override
- protected void processRecord(Tuple2<Boolean, Row> change) {
- synchronized (resultLock) {
- // wait if the buffer is full
- if (changeRecordBuffer.size() >= CHANGE_RECORD_BUFFER_SIZE) {
- try {
- resultLock.wait();
- } catch (InterruptedException e) {
- // ignore
- }
- } else {
- changeRecordBuffer.add(change);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/695bc56a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ChangelogResult.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ChangelogResult.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ChangelogResult.java
deleted file mode 100644
index 6d4f95a..0000000
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ChangelogResult.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.client.gateway.local;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.table.client.gateway.TypedResult;
-import org.apache.flink.types.Row;
-
-import java.util.List;
-
-/**
- * A result that is represented as a changelog consisting of insert and delete records.
- *
- * @param <C> cluster id to which this result belongs to
- */
-public interface ChangelogResult<C> extends DynamicResult<C> {
-
- /**
- * Retrieves the available result records.
- */
- TypedResult<List<Tuple2<Boolean, Row>>> retrieveChanges();
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/695bc56a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectStreamResult.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectStreamResult.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectStreamResult.java
deleted file mode 100644
index b7a0e79..0000000
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectStreamResult.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.client.gateway.local;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamUtils;
-import org.apache.flink.streaming.experimental.SocketStreamIterator;
-import org.apache.flink.table.client.SqlClientException;
-import org.apache.flink.table.client.gateway.SqlExecutionException;
-import org.apache.flink.table.client.gateway.TypedResult;
-import org.apache.flink.table.sinks.TableSink;
-import org.apache.flink.types.Row;
-
-import java.io.IOException;
-import java.net.InetAddress;
-
-/**
- * A result that works similarly to {@link DataStreamUtils#collect(DataStream)}.
- *
- * @param <C> cluster id to which this result belongs to
- */
-public abstract class CollectStreamResult<C> implements DynamicResult<C> {
-
- private final TypeInformation<Row> outputType;
- private final SocketStreamIterator<Tuple2<Boolean, Row>> iterator;
- private final CollectStreamTableSink collectTableSink;
- private final ResultRetrievalThread retrievalThread;
- private final JobMonitoringThread monitoringThread;
- private ProgramDeployer<C> deployer;
- private C clusterId;
-
- protected final Object resultLock;
- protected SqlExecutionException executionException;
-
- public CollectStreamResult(TypeInformation<Row> outputType, ExecutionConfig config,
- InetAddress gatewayAddress, int gatewayPort) {
- this.outputType = outputType;
-
- resultLock = new Object();
-
- // create socket stream iterator
- final TypeInformation<Tuple2<Boolean, Row>> socketType = Types.TUPLE(Types.BOOLEAN, outputType);
- final TypeSerializer<Tuple2<Boolean, Row>> serializer = socketType.createSerializer(config);
- try {
- // pass gateway port and address such that iterator knows where to bind to
- iterator = new SocketStreamIterator<>(gatewayPort, gatewayAddress, serializer);
- } catch (IOException e) {
- throw new SqlClientException("Could not start socket for result retrieval.", e);
- }
-
- // create table sink
- // pass binding address and port such that sink knows where to send to
- collectTableSink = new CollectStreamTableSink(iterator.getBindAddress(), iterator.getPort(), serializer);
- retrievalThread = new ResultRetrievalThread();
- monitoringThread = new JobMonitoringThread();
- }
-
- @Override
- public void setClusterId(C clusterId) {
- if (this.clusterId != null) {
- throw new IllegalStateException("Cluster id is already present.");
- }
- this.clusterId = clusterId;
- }
-
- @Override
- public TypeInformation<Row> getOutputType() {
- return outputType;
- }
-
- @Override
- public void startRetrieval(ProgramDeployer<C> deployer) {
- // start listener thread
- retrievalThread.start();
-
- // start deployer
- this.deployer = deployer;
- monitoringThread.start();
- }
-
- @Override
- public TableSink<?> getTableSink() {
- return collectTableSink;
- }
-
- @Override
- public void close() {
- retrievalThread.isRunning = false;
- retrievalThread.interrupt();
- monitoringThread.interrupt();
- iterator.close();
- }
-
- // --------------------------------------------------------------------------------------------
-
- protected <T> TypedResult<T> handleMissingResult() {
- // check if the monitoring thread is still there
- // we need to wait until we know what is going on
- if (monitoringThread.isAlive()) {
- return TypedResult.empty();
- }
- // the job finished with an exception
- else if (executionException != null) {
- throw executionException;
- }
- // we assume that a bounded job finished
- else {
- return TypedResult.endOfStream();
- }
- }
-
- protected boolean isRetrieving() {
- return retrievalThread.isRunning;
- }
-
- protected abstract void processRecord(Tuple2<Boolean, Row> change);
-
- // --------------------------------------------------------------------------------------------
-
- private class JobMonitoringThread extends Thread {
-
- @Override
- public void run() {
- try {
- deployer.run();
- } catch (SqlExecutionException e) {
- executionException = e;
- }
- }
- }
-
- // --------------------------------------------------------------------------------------------
-
- private class ResultRetrievalThread extends Thread {
-
- public volatile boolean isRunning = true;
-
- @Override
- public void run() {
- try {
- while (isRunning && iterator.hasNext()) {
- final Tuple2<Boolean, Row> change = iterator.next();
- processRecord(change);
- }
- } catch (RuntimeException e) {
- // ignore socket exceptions
- }
-
- // no result anymore
- // either the job is done or an error occurred
- isRunning = false;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/695bc56a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/DynamicResult.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/DynamicResult.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/DynamicResult.java
deleted file mode 100644
index 7c6f8c2..0000000
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/DynamicResult.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.client.gateway.local;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.table.sinks.TableSink;
-import org.apache.flink.types.Row;
-
-/**
- * A result of a dynamic table program.
- *
- * <p>Note: Make sure to call close() after the result is not needed anymore.
- *
- * @param <C> type of the cluster id to which this result belongs to
- */
-public interface DynamicResult<C> {
-
- /**
- * Sets the cluster id of the cluster this result comes from. This method should only be called once.
- */
- void setClusterId(C clusterId);
-
- /**
- * Returns whether this result is materialized such that snapshots can be taken or results
- * must be retrieved record-wise.
- */
- boolean isMaterialized();
-
- /**
- * Returns the output type as defined by the query.
- */
- TypeInformation<Row> getOutputType();
-
- /**
- * Starts the table program using the given deployer and monitors it's execution.
- */
- void startRetrieval(ProgramDeployer<C> deployer);
-
- /**
- * Returns the table sink required by this result type.
- */
- TableSink<?> getTableSink();
-
- /**
- * Closes the retrieval and all involved threads.
- */
- void close();
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/695bc56a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index 13e282a..926bdb0 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -169,6 +169,14 @@ public class ExecutionContext<T> {
return new EnvironmentInstance();
}
+ public Map<String, TableSource<?>> getTableSources() {
+ return tableSources;
+ }
+
+ public Map<String, TableSink<?>> getTableSinks() {
+ return tableSinks;
+ }
+
// --------------------------------------------------------------------------------------------
private static CommandLine createCommandLine(Deployment deployment, Options commandLineOptions) {
http://git-wip-us.apache.org/repos/asf/flink/blob/695bc56a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index d658ee9..b2e8271 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -31,16 +31,22 @@ import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.table.api.QueryConfig;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.client.SqlClientException;
import org.apache.flink.table.client.config.Environment;
import org.apache.flink.table.client.gateway.Executor;
+import org.apache.flink.table.client.gateway.ProgramTargetDescriptor;
import org.apache.flink.table.client.gateway.ResultDescriptor;
import org.apache.flink.table.client.gateway.SessionContext;
import org.apache.flink.table.client.gateway.SqlExecutionException;
import org.apache.flink.table.client.gateway.TypedResult;
+import org.apache.flink.table.client.gateway.local.result.BasicResult;
+import org.apache.flink.table.client.gateway.local.result.ChangelogResult;
+import org.apache.flink.table.client.gateway.local.result.DynamicResult;
+import org.apache.flink.table.client.gateway.local.result.MaterializedResult;
import org.apache.flink.types.Row;
import org.apache.flink.util.StringUtils;
@@ -277,6 +283,12 @@ public class LocalExecutor implements Executor {
}
@Override
+ public ProgramTargetDescriptor executeUpdate(SessionContext session, String statement) throws SqlExecutionException {
+ final ExecutionContext<?> context = getOrCreateExecutionContext(session);
+ return executeUpdateInternal(context, statement);
+ }
+
+ @Override
public void stop(SessionContext session) {
resultStore.getResults().forEach((resultId) -> {
try {
@@ -329,14 +341,43 @@ public class LocalExecutor implements Executor {
}
}
- private <T> ResultDescriptor executeQueryInternal(ExecutionContext<T> context, String query) {
+ private <C> ProgramTargetDescriptor executeUpdateInternal(ExecutionContext<C> context, String statement) {
+ final ExecutionContext.EnvironmentInstance envInst = context.createEnvironmentInstance();
+
+ applyUpdate(envInst.getTableEnvironment(), envInst.getQueryConfig(), statement);
+
+ // create job graph with dependencies
+ final String jobName = context.getSessionContext().getName() + ": " + statement;
+ final JobGraph jobGraph;
+ try {
+ jobGraph = envInst.createJobGraph(jobName);
+ } catch (Throwable t) {
+ // catch everything such that the statement does not crash the executor
+ throw new SqlExecutionException("Invalid SQL statement.", t);
+ }
+
+ // create execution
+ final BasicResult<C> result = new BasicResult<>();
+ final ProgramDeployer<C> deployer = new ProgramDeployer<>(
+ context, jobName, jobGraph, result, false);
+
+ // blocking deployment
+ deployer.run();
+
+ return ProgramTargetDescriptor.of(
+ result.getClusterId(),
+ jobGraph.getJobID(),
+ result.getWebInterfaceUrl());
+ }
+
+ private <C> ResultDescriptor executeQueryInternal(ExecutionContext<C> context, String query) {
final ExecutionContext.EnvironmentInstance envInst = context.createEnvironmentInstance();
// create table
final Table table = createTable(envInst.getTableEnvironment(), query);
// initialize result
- final DynamicResult<T> result = resultStore.createResult(
+ final DynamicResult<C> result = resultStore.createResult(
context.getMergedEnvironment(),
table.getSchema().withoutTimeAttributes(),
envInst.getExecutionConfig());
@@ -352,7 +393,7 @@ public class LocalExecutor implements Executor {
// it not stored in the result store
result.close();
// catch everything such that the query does not crash the executor
- throw new SqlExecutionException("Invalid SQL statement.", t);
+ throw new SqlExecutionException("Invalid SQL query.", t);
}
// store the result with a unique id (the job id for now)
@@ -360,7 +401,8 @@ public class LocalExecutor implements Executor {
resultStore.storeResult(resultId, result);
// create execution
- final ProgramDeployer<T> deployer = new ProgramDeployer<>(context, jobName, jobGraph, result);
+ final ProgramDeployer<C> deployer = new ProgramDeployer<>(
+ context, jobName, jobGraph, result, true);
// start result retrieval
result.startRetrieval(deployer);
@@ -371,10 +413,13 @@ public class LocalExecutor implements Executor {
result.isMaterialized());
}
- private Table createTable(TableEnvironment tableEnv, String query) {
+ /**
+ * Creates a table using the given query in the given table environment.
+ */
+ private Table createTable(TableEnvironment tableEnv, String selectQuery) {
// parse and validate query
try {
- return tableEnv.sqlQuery(query);
+ return tableEnv.sqlQuery(selectQuery);
} catch (Throwable t) {
// catch everything such that the query does not crash the executor
throw new SqlExecutionException("Invalid SQL statement.", t);
@@ -382,6 +427,19 @@ public class LocalExecutor implements Executor {
}
/**
+ * Applies the given update statement to the given table environment with query configuration.
+ */
+ private void applyUpdate(TableEnvironment tableEnv, QueryConfig queryConfig, String updateStatement) {
+ // parse and validate statement
+ try {
+ tableEnv.sqlUpdate(updateStatement, queryConfig);
+ } catch (Throwable t) {
+ // catch everything such that the statement does not crash the executor
+ throw new SqlExecutionException("Invalid SQL update statement.", t);
+ }
+ }
+
+ /**
* Creates or reuses the execution context.
*/
private synchronized ExecutionContext<?> getOrCreateExecutionContext(SessionContext session) throws SqlExecutionException {
http://git-wip-us.apache.org/repos/asf/flink/blob/695bc56a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/MaterializedCollectBatchResult.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/MaterializedCollectBatchResult.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/MaterializedCollectBatchResult.java
deleted file mode 100644
index 3bdc1fa..0000000
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/MaterializedCollectBatchResult.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.client.gateway.local;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.accumulators.SerializedListAccumulator;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.table.client.gateway.SqlExecutionException;
-import org.apache.flink.table.client.gateway.TypedResult;
-import org.apache.flink.table.sinks.TableSink;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.AbstractID;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Collects results using accumulators and returns them as table snapshots.
- */
-public class MaterializedCollectBatchResult<C> implements MaterializedResult<C> {
-
- private final TypeInformation<Row> outputType;
- private final String accumulatorName;
- private final CollectBatchTableSink tableSink;
- private final Object resultLock;
- private final Thread retrievalThread;
-
- private ProgramDeployer<C> deployer;
- private C clusterId;
- private int pageSize;
- private int pageCount;
- private SqlExecutionException executionException;
- private List<Row> resultTable;
-
- private volatile boolean snapshotted = false;
-
- public MaterializedCollectBatchResult(TypeInformation<Row> outputType, ExecutionConfig config) {
- this.outputType = outputType;
-
- accumulatorName = new AbstractID().toString();
- tableSink = new CollectBatchTableSink(accumulatorName, outputType.createSerializer(config));
- resultLock = new Object();
- retrievalThread = new ResultRetrievalThread();
-
- pageCount = 0;
- }
-
- @Override
- public void setClusterId(C clusterId) {
- if (this.clusterId != null) {
- throw new IllegalStateException("Cluster id is already present.");
- }
- this.clusterId = clusterId;
- }
-
- @Override
- public boolean isMaterialized() {
- return true;
- }
-
- @Override
- public TypeInformation<Row> getOutputType() {
- return outputType;
- }
-
- @Override
- public void startRetrieval(ProgramDeployer<C> deployer) {
- this.deployer = deployer;
- retrievalThread.start();
- }
-
- @Override
- public TableSink<?> getTableSink() {
- return tableSink;
- }
-
- @Override
- public void close() {
- retrievalThread.interrupt();
- }
-
- @Override
- public List<Row> retrievePage(int page) {
- synchronized (resultLock) {
- if (page <= 0 || page > pageCount) {
- throw new SqlExecutionException("Invalid page '" + page + "'.");
- }
- return resultTable.subList(pageSize * (page - 1), Math.min(resultTable.size(), page * pageSize));
- }
- }
-
- @Override
- public TypedResult<Integer> snapshot(int pageSize) {
- synchronized (resultLock) {
- // wait for a result
- if (retrievalThread.isAlive() && null == resultTable) {
- return TypedResult.empty();
- }
- // the job finished with an exception
- else if (executionException != null) {
- throw executionException;
- }
- // we return a payload result the first time and EoS for the rest of times as if the results
- // are retrieved dynamically
- else if (!snapshotted) {
- snapshotted = true;
- this.pageSize = pageSize;
- pageCount = Math.max(1, (int) Math.ceil(((double) resultTable.size() / pageSize)));
- return TypedResult.payload(pageCount);
- } else {
- return TypedResult.endOfStream();
- }
- }
- }
-
- // --------------------------------------------------------------------------------------------
-
- private class ResultRetrievalThread extends Thread {
-
- @Override
- public void run() {
- try {
- deployer.run();
- final JobExecutionResult result = deployer.fetchExecutionResult();
- final ArrayList<byte[]> accResult = result.getAccumulatorResult(accumulatorName);
- if (accResult == null) {
- throw new SqlExecutionException("The accumulator could not retrieve the result.");
- }
- final List<Row> resultTable = SerializedListAccumulator.deserializeList(accResult, tableSink.getSerializer());
- // sets the result table all at once
- synchronized (resultLock) {
- MaterializedCollectBatchResult.this.resultTable = resultTable;
- }
- } catch (ClassNotFoundException | IOException e) {
- executionException = new SqlExecutionException("Serialization error while deserializing collected data.", e);
- } catch (SqlExecutionException e) {
- executionException = e;
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/695bc56a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/MaterializedCollectStreamResult.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/MaterializedCollectStreamResult.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/MaterializedCollectStreamResult.java
deleted file mode 100644
index 0ce270e..0000000
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/MaterializedCollectStreamResult.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.client.gateway.local;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.table.client.gateway.SqlExecutionException;
-import org.apache.flink.table.client.gateway.TypedResult;
-import org.apache.flink.types.Row;
-
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Collects results and returns them as table snapshots.
- *
- * @param <C> cluster id to which this result belongs to
- */
-public class MaterializedCollectStreamResult<C> extends CollectStreamResult<C> implements MaterializedResult<C> {
-
- private final List<Row> materializedTable;
- private final Map<Row, List<Integer>> rowPositions; // positions of rows in table for faster access
- private final List<Row> snapshot;
- private int pageCount;
- private int pageSize;
- private boolean isLastSnapshot;
-
- public MaterializedCollectStreamResult(TypeInformation<Row> outputType, ExecutionConfig config,
- InetAddress gatewayAddress, int gatewayPort) {
- super(outputType, config, gatewayAddress, gatewayPort);
-
- // prepare for materialization
- materializedTable = new ArrayList<>();
- rowPositions = new HashMap<>();
- snapshot = new ArrayList<>();
- isLastSnapshot = false;
- pageCount = 0;
- }
-
- @Override
- public boolean isMaterialized() {
- return true;
- }
-
- @Override
- public TypedResult<Integer> snapshot(int pageSize) {
- synchronized (resultLock) {
- // retrieval thread is dead and there are no results anymore
- // or program failed
- if ((!isRetrieving() && isLastSnapshot) || executionException != null) {
- return handleMissingResult();
- }
- // this snapshot is the last result that can be delivered
- else if (!isRetrieving()) {
- isLastSnapshot = true;
- }
-
- this.pageSize = pageSize;
- snapshot.clear();
- snapshot.addAll(materializedTable);
-
- // at least one page
- pageCount = Math.max(1, (int) Math.ceil(((double) snapshot.size() / pageSize)));
-
- return TypedResult.payload(pageCount);
- }
- }
-
- @Override
- public List<Row> retrievePage(int page) {
- synchronized (resultLock) {
- if (page <= 0 || page > pageCount) {
- throw new SqlExecutionException("Invalid page '" + page + "'.");
- }
-
- return snapshot.subList(pageSize * (page - 1), Math.min(snapshot.size(), pageSize * page));
- }
- }
-
- // --------------------------------------------------------------------------------------------
-
- @Override
- protected void processRecord(Tuple2<Boolean, Row> change) {
- // we track the position of rows for faster access and in order to return consistent
- // snapshots where new rows are appended at the end
- synchronized (resultLock) {
- final List<Integer> positions = rowPositions.get(change.f1);
-
- // insert
- if (change.f0) {
- materializedTable.add(change.f1);
- if (positions == null) {
- // new row
- final ArrayList<Integer> pos = new ArrayList<>(1);
- pos.add(materializedTable.size() - 1);
- rowPositions.put(change.f1, pos);
- } else {
- // row exists already, only add position
- positions.add(materializedTable.size() - 1);
- }
- }
- // delete
- else {
- if (positions != null) {
- // delete row position and row itself
- final int pos = positions.remove(positions.size() - 1);
- materializedTable.remove(pos);
- if (positions.isEmpty()) {
- rowPositions.remove(change.f1);
- }
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/695bc56a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/MaterializedResult.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/MaterializedResult.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/MaterializedResult.java
deleted file mode 100644
index 858af4d..0000000
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/MaterializedResult.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.client.gateway.local;
-
-import org.apache.flink.table.client.gateway.TypedResult;
-import org.apache.flink.types.Row;
-
-import java.util.List;
-
-/**
- * A result that is materialized and can be viewed by navigating through a snapshot.
- *
- * @param <C> cluster id to which this result belongs to
- */
-public interface MaterializedResult<C> extends DynamicResult<C> {
-
- /**
- * Takes a snapshot of the current table and returns the number of pages for navigating
- * through the snapshot.
- */
- TypedResult<Integer> snapshot(int pageSize);
-
- /**
- * Retrieves a page of a snapshotted result.
- */
- List<Row> retrievePage(int page);
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/695bc56a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ProgramDeployer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ProgramDeployer.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ProgramDeployer.java
index 5c5cf98..05d12ff 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ProgramDeployer.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ProgramDeployer.java
@@ -24,6 +24,7 @@ import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.table.client.gateway.SqlExecutionException;
+import org.apache.flink.table.client.gateway.local.result.Result;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,18 +41,30 @@ public class ProgramDeployer<C> implements Runnable {
private final ExecutionContext<C> context;
private final JobGraph jobGraph;
private final String jobName;
- private final DynamicResult<C> result;
+ private final Result<C> result;
+ private final boolean awaitJobResult;
private final BlockingQueue<JobExecutionResult> executionResultBucket;
+ /**
+ * Deploys a table program on the cluster.
+ *
+ * @param context context with deployment information
+ * @param jobName job name of the Flink job to be submitted
+ * @param jobGraph Flink job graph
+ * @param result result that receives information about the target cluster
+ * @param awaitJobResult block for a job execution result from the cluster
+ */
public ProgramDeployer(
ExecutionContext<C> context,
String jobName,
JobGraph jobGraph,
- DynamicResult<C> result) {
+ Result<C> result,
+ boolean awaitJobResult) {
this.context = context;
this.jobGraph = jobGraph;
this.jobName = jobName;
this.result = result;
+ this.awaitJobResult = awaitJobResult;
executionResultBucket = new LinkedBlockingDeque<>(1);
}
@@ -62,7 +75,7 @@ public class ProgramDeployer<C> implements Runnable {
LOG.debug("Submitting job {} with the following environment: \n{}",
jobGraph.getJobID(), context.getMergedEnvironment());
}
- executionResultBucket.add(deployJob(context, jobGraph, result));
+ deployJob(context, jobGraph, result);
}
public JobExecutionResult fetchExecutionResult() {
@@ -73,45 +86,20 @@ public class ProgramDeployer<C> implements Runnable {
* Deploys a job. Depending on the deployment creates a new job cluster. It saves the cluster id in
* the result and blocks until job completion.
*/
- private <T> JobExecutionResult deployJob(ExecutionContext<T> context, JobGraph jobGraph, DynamicResult<T> result) {
+ private <T> void deployJob(ExecutionContext<T> context, JobGraph jobGraph, Result<T> result) {
// create or retrieve cluster and deploy job
try (final ClusterDescriptor<T> clusterDescriptor = context.createClusterDescriptor()) {
- ClusterClient<T> clusterClient = null;
try {
// new cluster
if (context.getClusterId() == null) {
- // deploy job cluster with job attached
- clusterClient = clusterDescriptor.deployJobCluster(context.getClusterSpec(), jobGraph, false);
- // save the new cluster id
- result.setClusterId(clusterClient.getClusterId());
- // we need to hard cast for now
- return ((RestClusterClient<T>) clusterClient)
- .requestJobResult(jobGraph.getJobID())
- .get()
- .toJobExecutionResult(context.getClassLoader()); // throws exception if job fails
+ deployJobOnNewCluster(clusterDescriptor, jobGraph, result, context.getClassLoader());
}
// reuse existing cluster
else {
- // retrieve existing cluster
- clusterClient = clusterDescriptor.retrieve(context.getClusterId());
- // save the cluster id
- result.setClusterId(clusterClient.getClusterId());
- // submit the job
- clusterClient.setDetached(false);
- return clusterClient
- .submitJob(jobGraph, context.getClassLoader())
- .getJobExecutionResult(); // throws exception if job fails
+ deployJobOnExistingCluster(context.getClusterId(), clusterDescriptor, jobGraph, result);
}
} catch (Exception e) {
throw new SqlExecutionException("Could not retrieve or create a cluster.", e);
- } finally {
- try {
- if (clusterClient != null) {
- clusterClient.shutdown();
- }
- } catch (Exception e) {
- // ignore
- }
}
} catch (SqlExecutionException e) {
throw e;
@@ -119,5 +107,77 @@ public class ProgramDeployer<C> implements Runnable {
throw new SqlExecutionException("Could not locate a cluster.", e);
}
}
+
+ private <T> void deployJobOnNewCluster(
+ ClusterDescriptor<T> clusterDescriptor,
+ JobGraph jobGraph,
+ Result<T> result,
+ ClassLoader classLoader) throws Exception {
+ ClusterClient<T> clusterClient = null;
+ try {
+ // deploy job cluster with job attached
+ clusterClient = clusterDescriptor.deployJobCluster(context.getClusterSpec(), jobGraph, false);
+ // save information about the new cluster
+ result.setClusterInformation(clusterClient.getClusterId(), clusterClient.getWebInterfaceURL());
+ // get result
+ if (awaitJobResult) {
+ // we need to hard cast for now
+ final JobExecutionResult jobResult = ((RestClusterClient<T>) clusterClient)
+ .requestJobResult(jobGraph.getJobID())
+ .get()
+ .toJobExecutionResult(context.getClassLoader()); // throws exception if job fails
+ executionResultBucket.add(jobResult);
+ }
+ } finally {
+ try {
+ if (clusterClient != null) {
+ clusterClient.shutdown();
+ }
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+ }
+
+ private <T> void deployJobOnExistingCluster(
+ T clusterId,
+ ClusterDescriptor<T> clusterDescriptor,
+ JobGraph jobGraph,
+ Result<T> result) throws Exception {
+ ClusterClient<T> clusterClient = null;
+ try {
+ // retrieve existing cluster
+ clusterClient = clusterDescriptor.retrieve(clusterId);
+ String webInterfaceUrl;
+ // retrieving the web interface URL might fail on legacy pre-FLIP-6 code paths
+ // TODO remove this once we drop support for legacy deployment code
+ try {
+ webInterfaceUrl = clusterClient.getWebInterfaceURL();
+ } catch (Exception e) {
+ webInterfaceUrl = "N/A";
+ }
+ // save the cluster information
+ result.setClusterInformation(clusterClient.getClusterId(), webInterfaceUrl);
+ // submit job (and get result)
+ if (awaitJobResult) {
+ clusterClient.setDetached(false);
+ final JobExecutionResult jobResult = clusterClient
+ .submitJob(jobGraph, context.getClassLoader())
+ .getJobExecutionResult(); // throws exception if job fails
+ executionResultBucket.add(jobResult);
+ } else {
+ clusterClient.setDetached(true);
+ clusterClient.submitJob(jobGraph, context.getClassLoader());
+ }
+ } finally {
+ try {
+ if (clusterClient != null) {
+ clusterClient.shutdown();
+ }
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/695bc56a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java
index f70378c..a54160f 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java
@@ -29,6 +29,10 @@ import org.apache.flink.table.client.SqlClientException;
import org.apache.flink.table.client.config.Deployment;
import org.apache.flink.table.client.config.Environment;
import org.apache.flink.table.client.gateway.SqlExecutionException;
+import org.apache.flink.table.client.gateway.local.result.ChangelogCollectStreamResult;
+import org.apache.flink.table.client.gateway.local.result.DynamicResult;
+import org.apache.flink.table.client.gateway.local.result.MaterializedCollectBatchResult;
+import org.apache.flink.table.client.gateway.local.result.MaterializedCollectStreamResult;
import org.apache.flink.types.Row;
import java.net.InetAddress;
http://git-wip-us.apache.org/repos/asf/flink/blob/695bc56a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/BasicResult.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/BasicResult.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/BasicResult.java
new file mode 100644
index 0000000..47e0122
--- /dev/null
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/BasicResult.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway.local.result;
+
+/**
+ * Basic result of a table program that has been submitted to a cluster.
+ *
+ * @param <C> cluster id to which this result belongs to
+ */
+public class BasicResult<C> implements Result<C> {
+
+ protected C clusterId;
+ protected String webInterfaceUrl;
+
+ @Override
+ public void setClusterInformation(C clusterId, String webInterfaceUrl) {
+ if (this.clusterId != null || this.webInterfaceUrl != null) {
+ throw new IllegalStateException("Cluster information is already present.");
+ }
+ this.clusterId = clusterId;
+ this.webInterfaceUrl = webInterfaceUrl;
+ }
+
+ public C getClusterId() {
+ if (this.clusterId == null) {
+ throw new IllegalStateException("Cluster ID has not been set.");
+ }
+ return clusterId;
+ }
+
+ public String getWebInterfaceUrl() {
+ if (this.webInterfaceUrl == null) {
+ throw new IllegalStateException("Cluster web interface URL has not been set.");
+ }
+ return webInterfaceUrl;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/695bc56a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/ChangelogCollectStreamResult.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/ChangelogCollectStreamResult.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/ChangelogCollectStreamResult.java
new file mode 100644
index 0000000..4e76888
--- /dev/null
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/ChangelogCollectStreamResult.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway.local.result;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.client.gateway.TypedResult;
+import org.apache.flink.types.Row;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Collects results and returns them as a changelog.
+ *
+ * @param <C> cluster id to which this result belongs to
+ */
+public class ChangelogCollectStreamResult<C> extends CollectStreamResult<C> implements ChangelogResult<C> {
+
+ private List<Tuple2<Boolean, Row>> changeRecordBuffer;
+ private static final int CHANGE_RECORD_BUFFER_SIZE = 5_000;
+
+ public ChangelogCollectStreamResult(TypeInformation<Row> outputType, ExecutionConfig config,
+ InetAddress gatewayAddress, int gatewayPort) {
+ super(outputType, config, gatewayAddress, gatewayPort);
+
+ // prepare for changelog
+ changeRecordBuffer = new ArrayList<>();
+ }
+
+ @Override
+ public boolean isMaterialized() {
+ return false;
+ }
+
+ @Override
+ public TypedResult<List<Tuple2<Boolean, Row>>> retrieveChanges() {
+ synchronized (resultLock) {
+ // retrieval thread is alive return a record if available
+ // but the program must not have failed
+ if (isRetrieving() && executionException == null) {
+ if (changeRecordBuffer.isEmpty()) {
+ return TypedResult.empty();
+ } else {
+ final List<Tuple2<Boolean, Row>> change = new ArrayList<>(changeRecordBuffer);
+ changeRecordBuffer.clear();
+ resultLock.notify();
+ return TypedResult.payload(change);
+ }
+ }
+ // retrieval thread is dead but there is still a record to be delivered
+ else if (!isRetrieving() && !changeRecordBuffer.isEmpty()) {
+ final List<Tuple2<Boolean, Row>> change = new ArrayList<>(changeRecordBuffer);
+ changeRecordBuffer.clear();
+ return TypedResult.payload(change);
+ }
+ // no results can be returned anymore
+ else {
+ return handleMissingResult();
+ }
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ protected void processRecord(Tuple2<Boolean, Row> change) {
+ synchronized (resultLock) {
+ // wait if the buffer is full
+ if (changeRecordBuffer.size() >= CHANGE_RECORD_BUFFER_SIZE) {
+ try {
+ resultLock.wait();
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ } else {
+ changeRecordBuffer.add(change);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/695bc56a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/ChangelogResult.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/ChangelogResult.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/ChangelogResult.java
new file mode 100644
index 0000000..eb6d6dd
--- /dev/null
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/ChangelogResult.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway.local.result;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.client.gateway.TypedResult;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+
+/**
+ * A result that is represented as a changelog consisting of insert and delete records.
+ *
+ * @param <C> cluster id to which this result belongs to
+ */
+public interface ChangelogResult<C> extends DynamicResult<C> {
+
+ /**
+ * Retrieves the available result records.
+ */
+ TypedResult<List<Tuple2<Boolean, Row>>> retrieveChanges();
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/695bc56a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/CollectStreamResult.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/CollectStreamResult.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/CollectStreamResult.java
new file mode 100644
index 0000000..656057b
--- /dev/null
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/CollectStreamResult.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway.local.result;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.experimental.SocketStreamIterator;
+import org.apache.flink.table.client.SqlClientException;
+import org.apache.flink.table.client.gateway.SqlExecutionException;
+import org.apache.flink.table.client.gateway.TypedResult;
+import org.apache.flink.table.client.gateway.local.CollectStreamTableSink;
+import org.apache.flink.table.client.gateway.local.ProgramDeployer;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+
+import java.io.IOException;
+import java.net.InetAddress;
+
+/**
+ * A result that works similarly to {@link DataStreamUtils#collect(DataStream)}.
+ *
+ * @param <C> cluster id to which this result belongs to
+ */
+public abstract class CollectStreamResult<C> extends BasicResult<C> implements DynamicResult<C> {
+
+ private final TypeInformation<Row> outputType;
+ private final SocketStreamIterator<Tuple2<Boolean, Row>> iterator;
+ private final CollectStreamTableSink collectTableSink;
+ private final ResultRetrievalThread retrievalThread;
+ private final JobMonitoringThread monitoringThread;
+ private ProgramDeployer<C> deployer;
+
+ protected final Object resultLock;
+ protected SqlExecutionException executionException;
+
+ public CollectStreamResult(TypeInformation<Row> outputType, ExecutionConfig config,
+ InetAddress gatewayAddress, int gatewayPort) {
+ this.outputType = outputType;
+
+ resultLock = new Object();
+
+ // create socket stream iterator
+ final TypeInformation<Tuple2<Boolean, Row>> socketType = Types.TUPLE(Types.BOOLEAN, outputType);
+ final TypeSerializer<Tuple2<Boolean, Row>> serializer = socketType.createSerializer(config);
+ try {
+ // pass gateway port and address such that iterator knows where to bind to
+ iterator = new SocketStreamIterator<>(gatewayPort, gatewayAddress, serializer);
+ } catch (IOException e) {
+ throw new SqlClientException("Could not start socket for result retrieval.", e);
+ }
+
+ // create table sink
+ // pass binding address and port such that sink knows where to send to
+ collectTableSink = new CollectStreamTableSink(iterator.getBindAddress(), iterator.getPort(), serializer);
+ retrievalThread = new ResultRetrievalThread();
+ monitoringThread = new JobMonitoringThread();
+ }
+
+ @Override
+ public TypeInformation<Row> getOutputType() {
+ return outputType;
+ }
+
+ @Override
+ public void startRetrieval(ProgramDeployer<C> deployer) {
+ // start listener thread
+ retrievalThread.start();
+
+ // start deployer
+ this.deployer = deployer;
+ monitoringThread.start();
+ }
+
+ @Override
+ public TableSink<?> getTableSink() {
+ return collectTableSink;
+ }
+
+ @Override
+ public void close() {
+ retrievalThread.isRunning = false;
+ retrievalThread.interrupt();
+ monitoringThread.interrupt();
+ iterator.close();
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ protected <T> TypedResult<T> handleMissingResult() {
+ // check if the monitoring thread is still there
+ // we need to wait until we know what is going on
+ if (monitoringThread.isAlive()) {
+ return TypedResult.empty();
+ }
+ // the job finished with an exception
+ else if (executionException != null) {
+ throw executionException;
+ }
+ // we assume that a bounded job finished
+ else {
+ return TypedResult.endOfStream();
+ }
+ }
+
+ protected boolean isRetrieving() {
+ return retrievalThread.isRunning;
+ }
+
+ protected abstract void processRecord(Tuple2<Boolean, Row> change);
+
+ // --------------------------------------------------------------------------------------------
+
+ private class JobMonitoringThread extends Thread {
+
+ @Override
+ public void run() {
+ try {
+ deployer.run();
+ } catch (SqlExecutionException e) {
+ executionException = e;
+ }
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ private class ResultRetrievalThread extends Thread {
+
+ public volatile boolean isRunning = true;
+
+ @Override
+ public void run() {
+ try {
+ while (isRunning && iterator.hasNext()) {
+ final Tuple2<Boolean, Row> change = iterator.next();
+ processRecord(change);
+ }
+ } catch (RuntimeException e) {
+ // ignore socket exceptions
+ }
+
+ // no result anymore
+ // either the job is done or an error occurred
+ isRunning = false;
+ }
+ }
+}