You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/07/15 19:49:14 UTC

[2/2] flink git commit: [FLINK-8858] [sql-client] Add support for INSERT INTO in SQL Client

[FLINK-8858] [sql-client] Add support for INSERT INTO in SQL Client

This closes #6332.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/695bc56a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/695bc56a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/695bc56a

Branch: refs/heads/master
Commit: 695bc56a9e20b9d86eea14a02899b400d324a7ea
Parents: a7be2e1
Author: Timo Walther <tw...@apache.org>
Authored: Thu Jul 5 18:11:18 2018 +0200
Committer: Timo Walther <tw...@apache.org>
Committed: Sun Jul 15 21:47:31 2018 +0200

----------------------------------------------------------------------
 flink-libraries/flink-sql-client/pom.xml        |   6 +-
 .../apache/flink/table/client/SqlClient.java    |  26 ++-
 .../flink/table/client/cli/CliClient.java       | 146 +++++++++++-----
 .../flink/table/client/cli/CliOptions.java      |  15 +-
 .../table/client/cli/CliOptionsParser.java      |  25 ++-
 .../flink/table/client/cli/CliStrings.java      |   7 +
 .../table/client/cli/SqlCommandParser.java      |  11 +-
 .../flink/table/client/gateway/Executor.java    |   9 +
 .../client/gateway/ProgramTargetDescriptor.java |  80 +++++++++
 .../local/ChangelogCollectStreamResult.java     |  99 -----------
 .../client/gateway/local/ChangelogResult.java   |  38 ----
 .../gateway/local/CollectStreamResult.java      | 175 -------------------
 .../client/gateway/local/DynamicResult.java     |  65 -------
 .../client/gateway/local/ExecutionContext.java  |   8 +
 .../client/gateway/local/LocalExecutor.java     |  70 +++++++-
 .../local/MaterializedCollectBatchResult.java   | 159 -----------------
 .../local/MaterializedCollectStreamResult.java  | 135 --------------
 .../gateway/local/MaterializedResult.java       |  43 -----
 .../client/gateway/local/ProgramDeployer.java   | 122 +++++++++----
 .../table/client/gateway/local/ResultStore.java |   4 +
 .../gateway/local/result/BasicResult.java       |  53 ++++++
 .../result/ChangelogCollectStreamResult.java    |  99 +++++++++++
 .../gateway/local/result/ChangelogResult.java   |  38 ++++
 .../local/result/CollectStreamResult.java       | 168 ++++++++++++++++++
 .../gateway/local/result/DynamicResult.java     |  61 +++++++
 .../result/MaterializedCollectBatchResult.java  | 152 ++++++++++++++++
 .../result/MaterializedCollectStreamResult.java | 135 ++++++++++++++
 .../local/result/MaterializedResult.java        |  43 +++++
 .../client/gateway/local/result/Result.java     |  33 ++++
 .../src/test/assembly/test-table-factories.xml  |  49 ++++++
 .../test/assembly/test-table-source-factory.xml |  47 -----
 .../flink/table/client/cli/CliClientTest.java   | 161 +++++++++++++++++
 .../client/gateway/local/DependencyTest.java    |  14 +-
 .../client/gateway/local/EnvironmentTest.java   |   1 +
 .../gateway/local/ExecutionContextTest.java     |  51 ++++++
 .../gateway/local/LocalExecutorITCase.java      |  66 ++++++-
 .../gateway/utils/TestTableSinkFactory.java     | 126 +++++++++++++
 .../gateway/utils/TestTableSourceFactory.java   |   8 +-
 .../test/resources/test-factory-services-file   |   1 +
 .../resources/test-sql-client-defaults.yaml     |  17 ++
 .../test/resources/test-sql-client-factory.yaml |   2 +-
 .../plan/nodes/PhysicalTableSourceScan.scala    |   3 +-
 42 files changed, 1705 insertions(+), 866 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/695bc56a/flink-libraries/flink-sql-client/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/pom.xml b/flink-libraries/flink-sql-client/pom.xml
index a9fc00e..2c6e2ba 100644
--- a/flink-libraries/flink-sql-client/pom.xml
+++ b/flink-libraries/flink-sql-client/pom.xml
@@ -177,16 +177,16 @@ under the License.
 				<version>2.4</version>
 				<executions>
 					<execution>
-						<id>create-table-source-factory-jar</id>
+						<id>create-table-factories-jar</id>
 						<phase>process-test-classes</phase>
 						<goals>
 							<goal>single</goal>
 						</goals>
 						<configuration>
-							<finalName>table-source-factory</finalName>
+							<finalName>table-factories</finalName>
 							<attach>false</attach>
 							<descriptors>
-								<descriptor>src/test/assembly/test-table-source-factory.xml</descriptor>
+								<descriptor>src/test/assembly/test-table-factories.xml</descriptor>
 							</descriptors>
 						</configuration>
 					</execution>

http://git-wip-us.apache.org/repos/asf/flink/blob/695bc56a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java
index 65d986c..3efaa6a 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java
@@ -97,14 +97,34 @@ public class SqlClient {
 			// add shutdown hook
 			Runtime.getRuntime().addShutdownHook(new EmbeddedShutdownThread(context, executor));
 
-			// start CLI
-			final CliClient cli = new CliClient(context, executor);
-			cli.open();
+			// do the actual work
+			openCli(context, executor);
 		} else {
 			throw new SqlClientException("Gateway mode is not supported yet.");
 		}
 	}
 
+	/**
+	 * Opens the CLI client for executing SQL statements.
+	 *
+	 * @param context session context
+	 * @param executor executor
+	 */
+	private void openCli(SessionContext context, Executor executor) {
+		final CliClient cli = new CliClient(context, executor);
+		// interactive CLI mode
+		if (options.getUpdateStatement() == null) {
+			cli.open();
+		}
+		// execute single update statement
+		else {
+			final boolean success = cli.submitUpdate(options.getUpdateStatement());
+			if (!success) {
+				throw new SqlClientException("Could not submit given SQL update statement to cluster.");
+			}
+		}
+	}
+
 	// --------------------------------------------------------------------------------------------
 
 	private static void shutdown(SessionContext context, Executor executor) {

http://git-wip-us.apache.org/repos/asf/flink/blob/695bc56a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
index 7bf7bb7..576f518 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
@@ -20,9 +20,9 @@ package org.apache.flink.table.client.cli;
 
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.client.SqlClientException;
-import org.apache.flink.table.client.cli.SqlCommandParser.SqlCommand;
 import org.apache.flink.table.client.cli.SqlCommandParser.SqlCommandCall;
 import org.apache.flink.table.client.gateway.Executor;
+import org.apache.flink.table.client.gateway.ProgramTargetDescriptor;
 import org.apache.flink.table.client.gateway.ResultDescriptor;
 import org.apache.flink.table.client.gateway.SessionContext;
 import org.apache.flink.table.client.gateway.SqlExecutionException;
@@ -50,6 +50,7 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 /**
  * SQL CLI client.
@@ -85,6 +86,9 @@ public class CliClient {
 			terminal = TerminalBuilder.builder()
 				.name(CliStrings.CLI_NAME)
 				.build();
+			// make space from previous output and test the writer
+			terminal.writer().println();
+			terminal.writer().flush();
 		} catch (IOException e) {
 			throw new SqlClientException("Error opening command line interface.", e);
 		}
@@ -149,6 +153,9 @@ public class CliClient {
 		return executor;
 	}
 
+	/**
+	 * Opens the interactive CLI shell.
+	 */
 	public void open() {
 		isRunning = true;
 
@@ -173,55 +180,91 @@ public class CliClient {
 			if (line == null || line.equals("")) {
 				continue;
 			}
+			final Optional<SqlCommandCall> cmdCall = parseCommand(line);
+			cmdCall.ifPresent(this::callCommand);
+		}
+	}
 
-			final SqlCommandCall cmdCall = SqlCommandParser.parse(line);
-
-			if (cmdCall == null) {
-				terminal.writer().println(CliStrings.messageError(CliStrings.MESSAGE_UNKNOWN_SQL));
-				continue;
-			}
+	/**
+	 * Submits a SQL update statement and prints status information and/or errors on the terminal.
+	 *
+	 * @param statement SQL update statement
+	 * @return flag to indicate if the submission was successful or not
+	 */
+	public boolean submitUpdate(String statement) {
+		terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_WILL_EXECUTE).toAnsi());
+		terminal.writer().println(new AttributedString(statement).toString());
+		terminal.flush();
 
+		final Optional<SqlCommandCall> parsedStatement = parseCommand(statement);
+		// only support INSERT INTO
+		return parsedStatement.map(cmdCall -> {
 			switch (cmdCall.command) {
-				case QUIT:
-				case EXIT:
-					callQuit(cmdCall);
-					break;
-				case CLEAR:
-					callClear(cmdCall);
-					break;
-				case RESET:
-					callReset(cmdCall);
-					break;
-				case SET:
-					callSet(cmdCall);
-					break;
-				case HELP:
-					callHelp(cmdCall);
-					break;
-				case SHOW_TABLES:
-					callShowTables(cmdCall);
-					break;
-				case SHOW_FUNCTIONS:
-					callShowFunctions(cmdCall);
-					break;
-				case DESCRIBE:
-					callDescribe(cmdCall);
-					break;
-				case EXPLAIN:
-					callExplain(cmdCall);
-					break;
-				case SELECT:
-					callSelect(cmdCall);
-					break;
-				case SOURCE:
-					callSource(cmdCall);
-					break;
+				case INSERT_INTO:
+					return callInsertInto(cmdCall);
+				default:
+					terminal.writer().println(CliStrings.messageError(CliStrings.MESSAGE_UNSUPPORTED_SQL).toAnsi());
+					terminal.flush();
+					return false;
 			}
-		}
+		}).orElse(false);
 	}
 
 	// --------------------------------------------------------------------------------------------
 
+	private Optional<SqlCommandCall> parseCommand(String line) {
+		final Optional<SqlCommandCall> parsedLine = SqlCommandParser.parse(line);
+		if (!parsedLine.isPresent()) {
+			terminal.writer().println(CliStrings.messageError(CliStrings.MESSAGE_UNKNOWN_SQL).toAnsi());
+			terminal.flush();
+		}
+		return parsedLine;
+	}
+
+	private void callCommand(SqlCommandCall cmdCall) {
+		switch (cmdCall.command) {
+			case QUIT:
+			case EXIT:
+				callQuit(cmdCall);
+				break;
+			case CLEAR:
+				callClear(cmdCall);
+				break;
+			case RESET:
+				callReset(cmdCall);
+				break;
+			case SET:
+				callSet(cmdCall);
+				break;
+			case HELP:
+				callHelp(cmdCall);
+				break;
+			case SHOW_TABLES:
+				callShowTables(cmdCall);
+				break;
+			case SHOW_FUNCTIONS:
+				callShowFunctions(cmdCall);
+				break;
+			case DESCRIBE:
+				callDescribe(cmdCall);
+				break;
+			case EXPLAIN:
+				callExplain(cmdCall);
+				break;
+			case SELECT:
+				callSelect(cmdCall);
+				break;
+			case INSERT_INTO:
+				callInsertInto(cmdCall);
+				break;
+			case SOURCE:
+				callSource(cmdCall);
+				break;
+			default:
+				throw new SqlClientException("Unsupported command: " + cmdCall.command);
+		}
+	}
+
 	private void callQuit(SqlCommandCall cmdCall) {
 		terminal.writer().println(CliStrings.MESSAGE_QUIT);
 		terminal.flush();
@@ -354,6 +397,22 @@ public class CliClient {
 		}
 	}
 
+	private boolean callInsertInto(SqlCommandCall cmdCall) {
+		terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_SUBMITTING_STATEMENT).toAnsi());
+		terminal.flush();
+
+		try {
+			final ProgramTargetDescriptor programTarget = executor.executeUpdate(context, cmdCall.operands[0]);
+			terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_STATEMENT_SUBMITTED).toAnsi());
+			terminal.writer().println(programTarget.toString());
+			terminal.flush();
+		} catch (SqlExecutionException e) {
+			printException(e);
+			return false;
+		}
+		return true;
+	}
+
 	private void callSource(SqlCommandCall cmdCall) {
 		final String pathString = cmdCall.operands[0];
 
@@ -384,7 +443,8 @@ public class CliClient {
 		terminal.flush();
 
 		// try to run it
-		callSelect(new SqlCommandCall(SqlCommand.SELECT, new String[] { stmt }));
+		final Optional<SqlCommandCall> call = parseCommand(stmt);
+		call.ifPresent(this::callCommand);
 	}
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/695bc56a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptions.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptions.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptions.java
index eddcc77..06a063a 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptions.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptions.java
@@ -33,14 +33,23 @@ public class CliOptions {
 	private final URL defaults;
 	private final List<URL> jars;
 	private final List<URL> libraryDirs;
+	private final String updateStatement;
 
-	public CliOptions(boolean isPrintHelp, String sessionId, URL environment, URL defaults, List<URL> jars, List<URL> libraryDirs) {
+	public CliOptions(
+			boolean isPrintHelp,
+			String sessionId,
+			URL environment,
+			URL defaults,
+			List<URL> jars,
+			List<URL> libraryDirs,
+			String updateStatement) {
 		this.isPrintHelp = isPrintHelp;
 		this.sessionId = sessionId;
 		this.environment = environment;
 		this.defaults = defaults;
 		this.jars = jars;
 		this.libraryDirs = libraryDirs;
+		this.updateStatement = updateStatement;
 	}
 
 	public boolean isPrintHelp() {
@@ -66,4 +75,8 @@ public class CliOptions {
 	public List<URL> getLibraryDirs() {
 		return libraryDirs;
 	}
+
+	public String getUpdateStatement() {
+		return updateStatement;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/695bc56a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java
index 77119ef..b924779 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java
@@ -103,6 +103,20 @@ public class CliOptionsParser {
 				"functions, table sources, or sinks. Can be used multiple times.")
 			.build();
 
+	public static final Option OPTION_UPDATE = Option
+			.builder("u")
+			.required(false)
+			.longOpt("update")
+			.numberOfArgs(1)
+			.argName("SQL update statement")
+			.desc(
+				"Experimental (for testing only!): Instructs the SQL Client to immediately execute " +
+				"the given update statement after starting up. The process is shut down after the " +
+				"statement has been submitted to the cluster and returns an appropriate return code. " +
+				"Currently, this feature is only supported for INSERT INTO statements that declare " +
+				"the target sink table.")
+			.build();
+
 	private static final Options EMBEDDED_MODE_CLIENT_OPTIONS = getEmbeddedModeClientOptions(new Options());
 	private static final Options GATEWAY_MODE_CLIENT_OPTIONS = getGatewayModeClientOptions(new Options());
 	private static final Options GATEWAY_MODE_GATEWAY_OPTIONS = getGatewayModeGatewayOptions(new Options());
@@ -118,6 +132,7 @@ public class CliOptionsParser {
 		options.addOption(OPTION_DEFAULTS);
 		options.addOption(OPTION_JAR);
 		options.addOption(OPTION_LIBRARY);
+		options.addOption(OPTION_UPDATE);
 		return options;
 	}
 
@@ -125,6 +140,7 @@ public class CliOptionsParser {
 		buildGeneralOptions(options);
 		options.addOption(OPTION_SESSION);
 		options.addOption(OPTION_ENVIRONMENT);
+		options.addOption(OPTION_UPDATE);
 		return options;
 	}
 
@@ -218,7 +234,8 @@ public class CliOptionsParser {
 				checkUrl(line, CliOptionsParser.OPTION_ENVIRONMENT),
 				checkUrl(line, CliOptionsParser.OPTION_DEFAULTS),
 				checkUrls(line, CliOptionsParser.OPTION_JAR),
-				checkUrls(line, CliOptionsParser.OPTION_LIBRARY)
+				checkUrls(line, CliOptionsParser.OPTION_LIBRARY),
+				line.getOptionValue(CliOptionsParser.OPTION_UPDATE.getOpt())
 			);
 		}
 		catch (ParseException e) {
@@ -236,7 +253,8 @@ public class CliOptionsParser {
 				checkUrl(line, CliOptionsParser.OPTION_ENVIRONMENT),
 				null,
 				checkUrls(line, CliOptionsParser.OPTION_JAR),
-				checkUrls(line, CliOptionsParser.OPTION_LIBRARY)
+				checkUrls(line, CliOptionsParser.OPTION_LIBRARY),
+				line.getOptionValue(CliOptionsParser.OPTION_UPDATE.getOpt())
 			);
 		}
 		catch (ParseException e) {
@@ -254,7 +272,8 @@ public class CliOptionsParser {
 				null,
 				checkUrl(line, CliOptionsParser.OPTION_DEFAULTS),
 				checkUrls(line, CliOptionsParser.OPTION_JAR),
-				checkUrls(line, CliOptionsParser.OPTION_LIBRARY)
+				checkUrls(line, CliOptionsParser.OPTION_LIBRARY),
+				null
 			);
 		}
 		catch (ParseException e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/695bc56a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
index b917317..aef7669 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
@@ -49,6 +49,7 @@ public final class CliStrings {
 		.append(formatCommand(SqlCommand.DESCRIBE, "Describes the schema of a table with the given name."))
 		.append(formatCommand(SqlCommand.EXPLAIN, "Describes the execution plan of a query or table with the given name."))
 		.append(formatCommand(SqlCommand.SELECT, "Executes a SQL SELECT query on the Flink cluster."))
+		.append(formatCommand(SqlCommand.INSERT_INTO, "Inserts the results of a SQL SELECT query into a declared table sink."))
 		.append(formatCommand(SqlCommand.SOURCE, "Reads a SQL SELECT query from a file and executes it on the Flink cluster."))
 		.append(formatCommand(SqlCommand.SET, "Sets a session configuration property. Syntax: 'SET <key>=<value>'. Use 'SET' for listing all properties."))
 		.append(formatCommand(SqlCommand.RESET, "Resets all session configuration properties."))
@@ -122,12 +123,18 @@ public final class CliStrings {
 
 	public static final String MESSAGE_RESULT_QUIT = "Result retrieval cancelled.";
 
+	public static final String MESSAGE_SUBMITTING_STATEMENT = "Submitting SQL update statement to the cluster...";
+
+	public static final String MESSAGE_STATEMENT_SUBMITTED = "Table update statement has been successfully submitted to the cluster:";
+
 	public static final String MESSAGE_INVALID_PATH = "Path is invalid.";
 
 	public static final String MESSAGE_MAX_SIZE_EXCEEDED = "The given file exceeds the maximum number of characters.";
 
 	public static final String MESSAGE_WILL_EXECUTE = "Executing the following statement:";
 
+	public static final String MESSAGE_UNSUPPORTED_SQL = "Unsupported SQL statement.";
+
 	// --------------------------------------------------------------------------------------------
 
 	public static final String RESULT_TITLE = "SQL Query Result";

http://git-wip-us.apache.org/repos/asf/flink/blob/695bc56a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
index 5543a3e..376b1f1 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.table.client.cli;
 
+import java.util.Optional;
+
 /**
  * Simple parser for determining the type of command and its parameters.
  */
@@ -27,7 +29,7 @@ public final class SqlCommandParser {
 		// private
 	}
 
-	public static SqlCommandCall parse(String stmt) {
+	public static Optional<SqlCommandCall> parse(String stmt) {
 		String trimmed = stmt.trim();
 		// remove ';' at the end because many people type it intuitively
 		if (trimmed.endsWith(";")) {
@@ -43,10 +45,11 @@ public final class SqlCommandParser {
 					// match
 					if (tokenCount < cmd.tokens.length && token.equalsIgnoreCase(cmd.tokens[tokenCount])) {
 						if (tokenCount == cmd.tokens.length - 1) {
-							return new SqlCommandCall(
+							final SqlCommandCall call = new SqlCommandCall(
 								cmd,
 								splitOperands(cmd, trimmed, trimmed.substring(Math.min(pos, trimmed.length())))
 							);
+							return Optional.of(call);
 						}
 					} else {
 						// next sql command
@@ -56,7 +59,7 @@ public final class SqlCommandParser {
 				}
 			}
 		}
-		return null;
+		return Optional.empty();
 	}
 
 	private static String[] splitOperands(SqlCommand cmd, String originalCall, String operands) {
@@ -69,6 +72,7 @@ public final class SqlCommandParser {
 					return new String[] {operands.substring(0, delimiter), operands.substring(delimiter + 1)};
 				}
 			case SELECT:
+			case INSERT_INTO:
 				return new String[] {originalCall};
 			default:
 				return new String[] {operands};
@@ -90,6 +94,7 @@ public final class SqlCommandParser {
 		DESCRIBE("describe"),
 		EXPLAIN("explain"),
 		SELECT("select"),
+		INSERT_INTO("insert into"),
 		SET("set"),
 		RESET("reset"),
 		SOURCE("source");

http://git-wip-us.apache.org/repos/asf/flink/blob/695bc56a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
index 9ace240..7f903a4 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
@@ -88,6 +88,15 @@ public interface Executor {
 	void cancelQuery(SessionContext session, String resultId) throws SqlExecutionException;
 
 	/**
+	 * Submits a Flink SQL update statement such as INSERT INTO.
+	 *
+	 * @param session context in with the statement is executed
+	 * @param statement SQL update statement (currently only INSERT INTO is supported)
+	 * @return information about the target of the submitted Flink job
+	 */
+	ProgramTargetDescriptor executeUpdate(SessionContext session, String statement) throws SqlExecutionException;
+
+	/**
 	 * Stops the executor.
 	 */
 	void stop(SessionContext session);

http://git-wip-us.apache.org/repos/asf/flink/blob/695bc56a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ProgramTargetDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ProgramTargetDescriptor.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ProgramTargetDescriptor.java
new file mode 100644
index 0000000..7f6b3d3
--- /dev/null
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ProgramTargetDescriptor.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway;
+
+import org.apache.flink.api.common.JobID;
+
+/**
+ * Describes the target where a table program has been submitted to.
+ */
+public class ProgramTargetDescriptor {
+
+	private final String clusterId;
+
+	private final String jobId;
+
+	private final String webInterfaceUrl;
+
+	public ProgramTargetDescriptor(String clusterId, String jobId, String webInterfaceUrl) {
+		this.clusterId = clusterId;
+		this.jobId = jobId;
+		this.webInterfaceUrl = webInterfaceUrl;
+	}
+
+	public String getClusterId() {
+		return clusterId;
+	}
+
+	public String getJobId() {
+		return jobId;
+	}
+
+	public String getWebInterfaceUrl() {
+		return webInterfaceUrl;
+	}
+
+	@Override
+	public String toString() {
+		return String.format(
+			"Cluster ID: %s\n" +
+			"Job ID: %s\n" +
+			"Web interface: %s",
+			clusterId, jobId, webInterfaceUrl);
+	}
+
+	/**
+	 * Creates a program target description from deployment classes.
+	 *
+	 * @param clusterId cluster id
+	 * @param jobId job id
+	 * @param <C> cluster id type
+	 * @return program target descriptor
+	 */
+	public static <C> ProgramTargetDescriptor of(C clusterId, JobID jobId, String webInterfaceUrl) {
+		String clusterIdString;
+		try {
+			// check if cluster id has a toString method
+			clusterId.getClass().getDeclaredMethod("toString");
+			clusterIdString = clusterId.toString();
+		} catch (NoSuchMethodException e) {
+			clusterIdString = clusterId.getClass().getSimpleName();
+		}
+		return new ProgramTargetDescriptor(clusterIdString, jobId.toString(), webInterfaceUrl);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/695bc56a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ChangelogCollectStreamResult.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ChangelogCollectStreamResult.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ChangelogCollectStreamResult.java
deleted file mode 100644
index 2375584..0000000
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ChangelogCollectStreamResult.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.client.gateway.local;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.table.client.gateway.TypedResult;
-import org.apache.flink.types.Row;
-
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Collects results and returns them as a changelog.
- *
- * @param <C> cluster id to which this result belongs to
- */
-public class ChangelogCollectStreamResult<C> extends CollectStreamResult<C> implements ChangelogResult<C> {
-
-	private List<Tuple2<Boolean, Row>> changeRecordBuffer;
-	private static final int CHANGE_RECORD_BUFFER_SIZE = 5_000;
-
-	public ChangelogCollectStreamResult(TypeInformation<Row> outputType, ExecutionConfig config,
-			InetAddress gatewayAddress, int gatewayPort) {
-		super(outputType, config, gatewayAddress, gatewayPort);
-
-		// prepare for changelog
-		changeRecordBuffer = new ArrayList<>();
-	}
-
-	@Override
-	public boolean isMaterialized() {
-		return false;
-	}
-
-	@Override
-	public TypedResult<List<Tuple2<Boolean, Row>>> retrieveChanges() {
-		synchronized (resultLock) {
-			// retrieval thread is alive return a record if available
-			// but the program must not have failed
-			if (isRetrieving() && executionException == null) {
-				if (changeRecordBuffer.isEmpty()) {
-					return TypedResult.empty();
-				} else {
-					final List<Tuple2<Boolean, Row>> change = new ArrayList<>(changeRecordBuffer);
-					changeRecordBuffer.clear();
-					resultLock.notify();
-					return TypedResult.payload(change);
-				}
-			}
-			// retrieval thread is dead but there is still a record to be delivered
-			else if (!isRetrieving() && !changeRecordBuffer.isEmpty()) {
-				final List<Tuple2<Boolean, Row>> change = new ArrayList<>(changeRecordBuffer);
-				changeRecordBuffer.clear();
-				return TypedResult.payload(change);
-			}
-			// no results can be returned anymore
-			else {
-				return handleMissingResult();
-			}
-		}
-	}
-
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	protected void processRecord(Tuple2<Boolean, Row> change) {
-		synchronized (resultLock) {
-			// wait if the buffer is full
-			if (changeRecordBuffer.size() >= CHANGE_RECORD_BUFFER_SIZE) {
-				try {
-					resultLock.wait();
-				} catch (InterruptedException e) {
-					// ignore
-				}
-			} else {
-				changeRecordBuffer.add(change);
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/695bc56a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ChangelogResult.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ChangelogResult.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ChangelogResult.java
deleted file mode 100644
index 6d4f95a..0000000
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ChangelogResult.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.client.gateway.local;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.table.client.gateway.TypedResult;
-import org.apache.flink.types.Row;
-
-import java.util.List;
-
-/**
- * A result that is represented as a changelog consisting of insert and delete records.
- *
- * @param <C> cluster id to which this result belongs to
- */
-public interface ChangelogResult<C> extends DynamicResult<C> {
-
-	/**
-	 * Retrieves the available result records.
-	 */
-	TypedResult<List<Tuple2<Boolean, Row>>> retrieveChanges();
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/695bc56a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectStreamResult.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectStreamResult.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectStreamResult.java
deleted file mode 100644
index b7a0e79..0000000
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectStreamResult.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.client.gateway.local;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamUtils;
-import org.apache.flink.streaming.experimental.SocketStreamIterator;
-import org.apache.flink.table.client.SqlClientException;
-import org.apache.flink.table.client.gateway.SqlExecutionException;
-import org.apache.flink.table.client.gateway.TypedResult;
-import org.apache.flink.table.sinks.TableSink;
-import org.apache.flink.types.Row;
-
-import java.io.IOException;
-import java.net.InetAddress;
-
-/**
- * A result that works similarly to {@link DataStreamUtils#collect(DataStream)}.
- *
- * @param <C> cluster id to which this result belongs to
- */
-public abstract class CollectStreamResult<C> implements DynamicResult<C> {
-
-	private final TypeInformation<Row> outputType;
-	private final SocketStreamIterator<Tuple2<Boolean, Row>> iterator;
-	private final CollectStreamTableSink collectTableSink;
-	private final ResultRetrievalThread retrievalThread;
-	private final JobMonitoringThread monitoringThread;
-	private ProgramDeployer<C> deployer;
-	private C clusterId;
-
-	protected final Object resultLock;
-	protected SqlExecutionException executionException;
-
-	public CollectStreamResult(TypeInformation<Row> outputType, ExecutionConfig config,
-			InetAddress gatewayAddress, int gatewayPort) {
-		this.outputType = outputType;
-
-		resultLock = new Object();
-
-		// create socket stream iterator
-		final TypeInformation<Tuple2<Boolean, Row>> socketType = Types.TUPLE(Types.BOOLEAN, outputType);
-		final TypeSerializer<Tuple2<Boolean, Row>> serializer = socketType.createSerializer(config);
-		try {
-			// pass gateway port and address such that iterator knows where to bind to
-			iterator = new SocketStreamIterator<>(gatewayPort, gatewayAddress, serializer);
-		} catch (IOException e) {
-			throw new SqlClientException("Could not start socket for result retrieval.", e);
-		}
-
-		// create table sink
-		// pass binding address and port such that sink knows where to send to
-		collectTableSink = new CollectStreamTableSink(iterator.getBindAddress(), iterator.getPort(), serializer);
-		retrievalThread = new ResultRetrievalThread();
-		monitoringThread = new JobMonitoringThread();
-	}
-
-	@Override
-	public void setClusterId(C clusterId) {
-		if (this.clusterId != null) {
-			throw new IllegalStateException("Cluster id is already present.");
-		}
-		this.clusterId = clusterId;
-	}
-
-	@Override
-	public TypeInformation<Row> getOutputType() {
-		return outputType;
-	}
-
-	@Override
-	public void startRetrieval(ProgramDeployer<C> deployer) {
-		// start listener thread
-		retrievalThread.start();
-
-		// start deployer
-		this.deployer = deployer;
-		monitoringThread.start();
-	}
-
-	@Override
-	public TableSink<?> getTableSink() {
-		return collectTableSink;
-	}
-
-	@Override
-	public void close() {
-		retrievalThread.isRunning = false;
-		retrievalThread.interrupt();
-		monitoringThread.interrupt();
-		iterator.close();
-	}
-
-	// --------------------------------------------------------------------------------------------
-
-	protected <T> TypedResult<T> handleMissingResult() {
-		// check if the monitoring thread is still there
-		// we need to wait until we know what is going on
-		if (monitoringThread.isAlive()) {
-			return TypedResult.empty();
-		}
-		// the job finished with an exception
-		else if (executionException != null) {
-			throw executionException;
-		}
-		// we assume that a bounded job finished
-		else {
-			return TypedResult.endOfStream();
-		}
-	}
-
-	protected boolean isRetrieving() {
-		return retrievalThread.isRunning;
-	}
-
-	protected abstract void processRecord(Tuple2<Boolean, Row> change);
-
-	// --------------------------------------------------------------------------------------------
-
-	private class JobMonitoringThread extends Thread {
-
-		@Override
-		public void run() {
-			try {
-				deployer.run();
-			} catch (SqlExecutionException e) {
-				executionException = e;
-			}
-		}
-	}
-
-	// --------------------------------------------------------------------------------------------
-
-	private class ResultRetrievalThread extends Thread {
-
-		public volatile boolean isRunning = true;
-
-		@Override
-		public void run() {
-			try {
-				while (isRunning && iterator.hasNext()) {
-					final Tuple2<Boolean, Row> change = iterator.next();
-					processRecord(change);
-				}
-			} catch (RuntimeException e) {
-				// ignore socket exceptions
-			}
-
-			// no result anymore
-			// either the job is done or an error occurred
-			isRunning = false;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/695bc56a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/DynamicResult.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/DynamicResult.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/DynamicResult.java
deleted file mode 100644
index 7c6f8c2..0000000
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/DynamicResult.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.client.gateway.local;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.table.sinks.TableSink;
-import org.apache.flink.types.Row;
-
-/**
- * A result of a dynamic table program.
- *
- * <p>Note: Make sure to call close() after the result is not needed anymore.
- *
- * @param <C> type of the cluster id to which this result belongs to
- */
-public interface DynamicResult<C> {
-
-	/**
-	 * Sets the cluster id of the cluster this result comes from. This method should only be called once.
-	 */
-	void setClusterId(C clusterId);
-
-	/**
-	 * Returns whether this result is materialized such that snapshots can be taken or results
-	 * must be retrieved record-wise.
-	 */
-	boolean isMaterialized();
-
-	/**
-	 * Returns the output type as defined by the query.
-	 */
-	TypeInformation<Row> getOutputType();
-
-	/**
-	 * Starts the table program using the given deployer and monitors it's execution.
-	 */
-	void startRetrieval(ProgramDeployer<C> deployer);
-
-	/**
-	 * Returns the table sink required by this result type.
-	 */
-	TableSink<?> getTableSink();
-
-	/**
-	 * Closes the retrieval and all involved threads.
-	 */
-	void close();
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/695bc56a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index 13e282a..926bdb0 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -169,6 +169,14 @@ public class ExecutionContext<T> {
 		return new EnvironmentInstance();
 	}
 
+	public Map<String, TableSource<?>> getTableSources() {
+		return tableSources;
+	}
+
+	public Map<String, TableSink<?>> getTableSinks() {
+		return tableSinks;
+	}
+
 	// --------------------------------------------------------------------------------------------
 
 	private static CommandLine createCommandLine(Deployment deployment, Options commandLineOptions) {

http://git-wip-us.apache.org/repos/asf/flink/blob/695bc56a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index d658ee9..b2e8271 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -31,16 +31,22 @@ import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.table.api.QueryConfig;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.client.SqlClientException;
 import org.apache.flink.table.client.config.Environment;
 import org.apache.flink.table.client.gateway.Executor;
+import org.apache.flink.table.client.gateway.ProgramTargetDescriptor;
 import org.apache.flink.table.client.gateway.ResultDescriptor;
 import org.apache.flink.table.client.gateway.SessionContext;
 import org.apache.flink.table.client.gateway.SqlExecutionException;
 import org.apache.flink.table.client.gateway.TypedResult;
+import org.apache.flink.table.client.gateway.local.result.BasicResult;
+import org.apache.flink.table.client.gateway.local.result.ChangelogResult;
+import org.apache.flink.table.client.gateway.local.result.DynamicResult;
+import org.apache.flink.table.client.gateway.local.result.MaterializedResult;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.StringUtils;
 
@@ -277,6 +283,12 @@ public class LocalExecutor implements Executor {
 	}
 
 	@Override
+	public ProgramTargetDescriptor executeUpdate(SessionContext session, String statement) throws SqlExecutionException {
+		final ExecutionContext<?> context = getOrCreateExecutionContext(session);
+		return executeUpdateInternal(context, statement);
+	}
+
+	@Override
 	public void stop(SessionContext session) {
 		resultStore.getResults().forEach((resultId) -> {
 			try {
@@ -329,14 +341,43 @@ public class LocalExecutor implements Executor {
 		}
 	}
 
-	private <T> ResultDescriptor executeQueryInternal(ExecutionContext<T> context, String query) {
+	private <C> ProgramTargetDescriptor executeUpdateInternal(ExecutionContext<C> context, String statement) {
+		final ExecutionContext.EnvironmentInstance envInst = context.createEnvironmentInstance();
+
+		applyUpdate(envInst.getTableEnvironment(), envInst.getQueryConfig(), statement);
+
+		// create job graph with dependencies
+		final String jobName = context.getSessionContext().getName() + ": " + statement;
+		final JobGraph jobGraph;
+		try {
+			jobGraph = envInst.createJobGraph(jobName);
+		} catch (Throwable t) {
+			// catch everything such that the statement does not crash the executor
+			throw new SqlExecutionException("Invalid SQL statement.", t);
+		}
+
+		// create execution
+		final BasicResult<C> result = new BasicResult<>();
+		final ProgramDeployer<C> deployer = new ProgramDeployer<>(
+			context, jobName, jobGraph, result, false);
+
+		// blocking deployment
+		deployer.run();
+
+		return ProgramTargetDescriptor.of(
+			result.getClusterId(),
+			jobGraph.getJobID(),
+			result.getWebInterfaceUrl());
+	}
+
+	private <C> ResultDescriptor executeQueryInternal(ExecutionContext<C> context, String query) {
 		final ExecutionContext.EnvironmentInstance envInst = context.createEnvironmentInstance();
 
 		// create table
 		final Table table = createTable(envInst.getTableEnvironment(), query);
 
 		// initialize result
-		final DynamicResult<T> result = resultStore.createResult(
+		final DynamicResult<C> result = resultStore.createResult(
 			context.getMergedEnvironment(),
 			table.getSchema().withoutTimeAttributes(),
 			envInst.getExecutionConfig());
@@ -352,7 +393,7 @@ public class LocalExecutor implements Executor {
 			// it not stored in the result store
 			result.close();
 			// catch everything such that the query does not crash the executor
-			throw new SqlExecutionException("Invalid SQL statement.", t);
+			throw new SqlExecutionException("Invalid SQL query.", t);
 		}
 
 		// store the result with a unique id (the job id for now)
@@ -360,7 +401,8 @@ public class LocalExecutor implements Executor {
 		resultStore.storeResult(resultId, result);
 
 		// create execution
-		final ProgramDeployer<T> deployer = new ProgramDeployer<>(context, jobName, jobGraph, result);
+		final ProgramDeployer<C> deployer = new ProgramDeployer<>(
+			context, jobName, jobGraph, result, true);
 
 		// start result retrieval
 		result.startRetrieval(deployer);
@@ -371,10 +413,13 @@ public class LocalExecutor implements Executor {
 			result.isMaterialized());
 	}
 
-	private Table createTable(TableEnvironment tableEnv, String query) {
+	/**
+	 * Creates a table using the given query in the given table environment.
+	 */
+	private Table createTable(TableEnvironment tableEnv, String selectQuery) {
 		// parse and validate query
 		try {
-			return tableEnv.sqlQuery(query);
+			return tableEnv.sqlQuery(selectQuery);
 		} catch (Throwable t) {
 			// catch everything such that the query does not crash the executor
 			throw new SqlExecutionException("Invalid SQL statement.", t);
@@ -382,6 +427,19 @@ public class LocalExecutor implements Executor {
 	}
 
 	/**
+	 * Applies the given update statement to the given table environment with query configuration.
+	 */
+	private void applyUpdate(TableEnvironment tableEnv, QueryConfig queryConfig, String updateStatement) {
+		// parse and validate statement
+		try {
+			tableEnv.sqlUpdate(updateStatement, queryConfig);
+		} catch (Throwable t) {
+			// catch everything such that the statement does not crash the executor
+			throw new SqlExecutionException("Invalid SQL update statement.", t);
+		}
+	}
+
+	/**
 	 * Creates or reuses the execution context.
 	 */
 	private synchronized ExecutionContext<?> getOrCreateExecutionContext(SessionContext session) throws SqlExecutionException {

http://git-wip-us.apache.org/repos/asf/flink/blob/695bc56a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/MaterializedCollectBatchResult.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/MaterializedCollectBatchResult.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/MaterializedCollectBatchResult.java
deleted file mode 100644
index 3bdc1fa..0000000
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/MaterializedCollectBatchResult.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.client.gateway.local;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.accumulators.SerializedListAccumulator;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.table.client.gateway.SqlExecutionException;
-import org.apache.flink.table.client.gateway.TypedResult;
-import org.apache.flink.table.sinks.TableSink;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.AbstractID;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Collects results using accumulators and returns them as table snapshots.
- */
-public class MaterializedCollectBatchResult<C> implements MaterializedResult<C> {
-
-	private final TypeInformation<Row> outputType;
-	private final String accumulatorName;
-	private final CollectBatchTableSink tableSink;
-	private final Object resultLock;
-	private final Thread retrievalThread;
-
-	private ProgramDeployer<C> deployer;
-	private C clusterId;
-	private int pageSize;
-	private int pageCount;
-	private SqlExecutionException executionException;
-	private List<Row> resultTable;
-
-	private volatile boolean snapshotted = false;
-
-	public MaterializedCollectBatchResult(TypeInformation<Row> outputType, ExecutionConfig config) {
-		this.outputType = outputType;
-
-		accumulatorName = new AbstractID().toString();
-		tableSink = new CollectBatchTableSink(accumulatorName, outputType.createSerializer(config));
-		resultLock = new Object();
-		retrievalThread = new ResultRetrievalThread();
-
-		pageCount = 0;
-	}
-
-	@Override
-	public void setClusterId(C clusterId) {
-		if (this.clusterId != null) {
-			throw new IllegalStateException("Cluster id is already present.");
-		}
-		this.clusterId = clusterId;
-	}
-
-	@Override
-	public boolean isMaterialized() {
-		return true;
-	}
-
-	@Override
-	public TypeInformation<Row> getOutputType() {
-		return outputType;
-	}
-
-	@Override
-	public void startRetrieval(ProgramDeployer<C> deployer) {
-		this.deployer = deployer;
-		retrievalThread.start();
-	}
-
-	@Override
-	public TableSink<?> getTableSink() {
-		return tableSink;
-	}
-
-	@Override
-	public void close() {
-		retrievalThread.interrupt();
-	}
-
-	@Override
-	public List<Row> retrievePage(int page) {
-		synchronized (resultLock) {
-			if (page <= 0 || page > pageCount) {
-				throw new SqlExecutionException("Invalid page '" + page + "'.");
-			}
-			return resultTable.subList(pageSize * (page - 1), Math.min(resultTable.size(), page * pageSize));
-		}
-	}
-
-	@Override
-	public TypedResult<Integer> snapshot(int pageSize) {
-		synchronized (resultLock) {
-			// wait for a result
-			if (retrievalThread.isAlive() && null == resultTable) {
-				return TypedResult.empty();
-			}
-			// the job finished with an exception
-			else if (executionException != null) {
-				throw executionException;
-			}
-			// we return a payload result the first time and EoS for the rest of times as if the results
-			// are retrieved dynamically
-			else if (!snapshotted) {
-				snapshotted = true;
-				this.pageSize = pageSize;
-				pageCount = Math.max(1, (int) Math.ceil(((double) resultTable.size() / pageSize)));
-				return TypedResult.payload(pageCount);
-			} else {
-				return TypedResult.endOfStream();
-			}
-		}
-	}
-
-	// --------------------------------------------------------------------------------------------
-
-	private class ResultRetrievalThread extends Thread {
-
-		@Override
-		public void run() {
-			try {
-				deployer.run();
-				final JobExecutionResult result = deployer.fetchExecutionResult();
-				final ArrayList<byte[]> accResult = result.getAccumulatorResult(accumulatorName);
-				if (accResult == null) {
-					throw new SqlExecutionException("The accumulator could not retrieve the result.");
-				}
-				final List<Row> resultTable = SerializedListAccumulator.deserializeList(accResult, tableSink.getSerializer());
-				// sets the result table all at once
-				synchronized (resultLock) {
-					MaterializedCollectBatchResult.this.resultTable = resultTable;
-				}
-			} catch (ClassNotFoundException | IOException e) {
-				executionException = new SqlExecutionException("Serialization error while deserializing collected data.", e);
-			} catch (SqlExecutionException e) {
-				executionException = e;
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/695bc56a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/MaterializedCollectStreamResult.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/MaterializedCollectStreamResult.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/MaterializedCollectStreamResult.java
deleted file mode 100644
index 0ce270e..0000000
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/MaterializedCollectStreamResult.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.client.gateway.local;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.table.client.gateway.SqlExecutionException;
-import org.apache.flink.table.client.gateway.TypedResult;
-import org.apache.flink.types.Row;
-
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Collects results and returns them as table snapshots.
- *
- * @param <C> cluster id to which this result belongs to
- */
-public class MaterializedCollectStreamResult<C> extends CollectStreamResult<C> implements MaterializedResult<C> {
-
-	private final List<Row> materializedTable;
-	private final Map<Row, List<Integer>> rowPositions; // positions of rows in table for faster access
-	private final List<Row> snapshot;
-	private int pageCount;
-	private int pageSize;
-	private boolean isLastSnapshot;
-
-	public MaterializedCollectStreamResult(TypeInformation<Row> outputType, ExecutionConfig config,
-			InetAddress gatewayAddress, int gatewayPort) {
-		super(outputType, config, gatewayAddress, gatewayPort);
-
-		// prepare for materialization
-		materializedTable = new ArrayList<>();
-		rowPositions = new HashMap<>();
-		snapshot = new ArrayList<>();
-		isLastSnapshot = false;
-		pageCount = 0;
-	}
-
-	@Override
-	public boolean isMaterialized() {
-		return true;
-	}
-
-	@Override
-	public TypedResult<Integer> snapshot(int pageSize) {
-		synchronized (resultLock) {
-			// retrieval thread is dead and there are no results anymore
-			// or program failed
-			if ((!isRetrieving() && isLastSnapshot) || executionException != null) {
-				return handleMissingResult();
-			}
-			// this snapshot is the last result that can be delivered
-			else if (!isRetrieving()) {
-				isLastSnapshot = true;
-			}
-
-			this.pageSize = pageSize;
-			snapshot.clear();
-			snapshot.addAll(materializedTable);
-
-			// at least one page
-			pageCount = Math.max(1, (int) Math.ceil(((double) snapshot.size() / pageSize)));
-
-			return TypedResult.payload(pageCount);
-		}
-	}
-
-	@Override
-	public List<Row> retrievePage(int page) {
-		synchronized (resultLock) {
-			if (page <= 0 || page > pageCount) {
-				throw new SqlExecutionException("Invalid page '" + page + "'.");
-			}
-
-			return snapshot.subList(pageSize * (page - 1), Math.min(snapshot.size(), pageSize * page));
-		}
-	}
-
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	protected void processRecord(Tuple2<Boolean, Row> change) {
-		// we track the position of rows for faster access and in order to return consistent
-		// snapshots where new rows are appended at the end
-		synchronized (resultLock) {
-			final List<Integer> positions = rowPositions.get(change.f1);
-
-			// insert
-			if (change.f0) {
-				materializedTable.add(change.f1);
-				if (positions == null) {
-					// new row
-					final ArrayList<Integer> pos = new ArrayList<>(1);
-					pos.add(materializedTable.size() - 1);
-					rowPositions.put(change.f1, pos);
-				} else {
-					// row exists already, only add position
-					positions.add(materializedTable.size() - 1);
-				}
-			}
-			// delete
-			else {
-				if (positions != null) {
-					// delete row position and row itself
-					final int pos = positions.remove(positions.size() - 1);
-					materializedTable.remove(pos);
-					if (positions.isEmpty()) {
-						rowPositions.remove(change.f1);
-					}
-				}
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/695bc56a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/MaterializedResult.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/MaterializedResult.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/MaterializedResult.java
deleted file mode 100644
index 858af4d..0000000
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/MaterializedResult.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.client.gateway.local;
-
-import org.apache.flink.table.client.gateway.TypedResult;
-import org.apache.flink.types.Row;
-
-import java.util.List;
-
-/**
- * A result that is materialized and can be viewed by navigating through a snapshot.
- *
- * @param <C> cluster id to which this result belongs to
- */
-public interface MaterializedResult<C> extends DynamicResult<C> {
-
-	/**
-	 * Takes a snapshot of the current table and returns the number of pages for navigating
-	 * through the snapshot.
-	 */
-	TypedResult<Integer> snapshot(int pageSize);
-
-	/**
-	 * Retrieves a page of a snapshotted result.
-	 */
-	List<Row> retrievePage(int page);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/695bc56a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ProgramDeployer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ProgramDeployer.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ProgramDeployer.java
index 5c5cf98..05d12ff 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ProgramDeployer.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ProgramDeployer.java
@@ -24,6 +24,7 @@ import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.table.client.gateway.SqlExecutionException;
+import org.apache.flink.table.client.gateway.local.result.Result;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,18 +41,30 @@ public class ProgramDeployer<C> implements Runnable {
 	private final ExecutionContext<C> context;
 	private final JobGraph jobGraph;
 	private final String jobName;
-	private final DynamicResult<C> result;
+	private final Result<C> result;
+	private final boolean awaitJobResult;
 	private final BlockingQueue<JobExecutionResult> executionResultBucket;
 
+	/**
+	 * Deploys a table program on the cluster.
+	 *
+	 * @param context        context with deployment information
+	 * @param jobName        job name of the Flink job to be submitted
+	 * @param jobGraph       Flink job graph
+	 * @param result         result that receives information about the target cluster
+	 * @param awaitJobResult block for a job execution result from the cluster
+	 */
 	public ProgramDeployer(
 			ExecutionContext<C> context,
 			String jobName,
 			JobGraph jobGraph,
-			DynamicResult<C> result) {
+			Result<C> result,
+			boolean awaitJobResult) {
 		this.context = context;
 		this.jobGraph = jobGraph;
 		this.jobName = jobName;
 		this.result = result;
+		this.awaitJobResult = awaitJobResult;
 		executionResultBucket = new LinkedBlockingDeque<>(1);
 	}
 
@@ -62,7 +75,7 @@ public class ProgramDeployer<C> implements Runnable {
 			LOG.debug("Submitting job {} with the following environment: \n{}",
 					jobGraph.getJobID(), context.getMergedEnvironment());
 		}
-		executionResultBucket.add(deployJob(context, jobGraph, result));
+		deployJob(context, jobGraph, result);
 	}
 
 	public JobExecutionResult fetchExecutionResult() {
@@ -73,45 +86,20 @@ public class ProgramDeployer<C> implements Runnable {
 	 * Deploys a job. Depending on the deployment creates a new job cluster. It saves the cluster id in
 	 * the result and blocks until job completion.
 	 */
-	private <T> JobExecutionResult deployJob(ExecutionContext<T> context, JobGraph jobGraph, DynamicResult<T> result) {
+	private <T> void deployJob(ExecutionContext<T> context, JobGraph jobGraph, Result<T> result) {
 		// create or retrieve cluster and deploy job
 		try (final ClusterDescriptor<T> clusterDescriptor = context.createClusterDescriptor()) {
-			ClusterClient<T> clusterClient = null;
 			try {
 				// new cluster
 				if (context.getClusterId() == null) {
-					// deploy job cluster with job attached
-					clusterClient = clusterDescriptor.deployJobCluster(context.getClusterSpec(), jobGraph, false);
-					// save the new cluster id
-					result.setClusterId(clusterClient.getClusterId());
-					// we need to hard cast for now
-					return ((RestClusterClient<T>) clusterClient)
-							.requestJobResult(jobGraph.getJobID())
-							.get()
-							.toJobExecutionResult(context.getClassLoader()); // throws exception if job fails
+					deployJobOnNewCluster(clusterDescriptor, jobGraph, result, context.getClassLoader());
 				}
 				// reuse existing cluster
 				else {
-					// retrieve existing cluster
-					clusterClient = clusterDescriptor.retrieve(context.getClusterId());
-					// save the cluster id
-					result.setClusterId(clusterClient.getClusterId());
-					// submit the job
-					clusterClient.setDetached(false);
-					return clusterClient
-						.submitJob(jobGraph, context.getClassLoader())
-						.getJobExecutionResult(); // throws exception if job fails
+					deployJobOnExistingCluster(context.getClusterId(), clusterDescriptor, jobGraph, result);
 				}
 			} catch (Exception e) {
 				throw new SqlExecutionException("Could not retrieve or create a cluster.", e);
-			} finally {
-				try {
-					if (clusterClient != null) {
-						clusterClient.shutdown();
-					}
-				} catch (Exception e) {
-					// ignore
-				}
 			}
 		} catch (SqlExecutionException e) {
 			throw e;
@@ -119,5 +107,77 @@ public class ProgramDeployer<C> implements Runnable {
 			throw new SqlExecutionException("Could not locate a cluster.", e);
 		}
 	}
+
+	private <T> void deployJobOnNewCluster(
+			ClusterDescriptor<T> clusterDescriptor,
+			JobGraph jobGraph,
+			Result<T> result,
+			ClassLoader classLoader) throws Exception {
+		ClusterClient<T> clusterClient = null;
+		try {
+			// deploy job cluster with job attached
+			clusterClient = clusterDescriptor.deployJobCluster(context.getClusterSpec(), jobGraph, false);
+			// save information about the new cluster
+			result.setClusterInformation(clusterClient.getClusterId(), clusterClient.getWebInterfaceURL());
+			// get result
+			if (awaitJobResult) {
+				// we need to hard cast for now
+				final JobExecutionResult jobResult = ((RestClusterClient<T>) clusterClient)
+						.requestJobResult(jobGraph.getJobID())
+						.get()
+						.toJobExecutionResult(context.getClassLoader()); // throws exception if job fails
+				executionResultBucket.add(jobResult);
+			}
+		} finally {
+			try {
+				if (clusterClient != null) {
+					clusterClient.shutdown();
+				}
+			} catch (Exception e) {
+				// ignore
+			}
+		}
+	}
+
+	private <T> void deployJobOnExistingCluster(
+			T clusterId,
+			ClusterDescriptor<T> clusterDescriptor,
+			JobGraph jobGraph,
+			Result<T> result) throws Exception {
+		ClusterClient<T> clusterClient = null;
+		try {
+			// retrieve existing cluster
+			clusterClient = clusterDescriptor.retrieve(clusterId);
+			String webInterfaceUrl;
+			// retrieving the web interface URL might fail on legacy pre-FLIP-6 code paths
+			// TODO remove this once we drop support for legacy deployment code
+			try {
+				webInterfaceUrl = clusterClient.getWebInterfaceURL();
+			} catch (Exception e) {
+				webInterfaceUrl = "N/A";
+			}
+			// save the cluster information
+			result.setClusterInformation(clusterClient.getClusterId(), webInterfaceUrl);
+			// submit job (and get result)
+			if (awaitJobResult) {
+				clusterClient.setDetached(false);
+				final JobExecutionResult jobResult = clusterClient
+					.submitJob(jobGraph, context.getClassLoader())
+					.getJobExecutionResult(); // throws exception if job fails
+				executionResultBucket.add(jobResult);
+			} else {
+				clusterClient.setDetached(true);
+				clusterClient.submitJob(jobGraph, context.getClassLoader());
+			}
+		} finally {
+			try {
+				if (clusterClient != null) {
+					clusterClient.shutdown();
+				}
+			} catch (Exception e) {
+				// ignore
+			}
+		}
+	}
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/695bc56a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java
index f70378c..a54160f 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java
@@ -29,6 +29,10 @@ import org.apache.flink.table.client.SqlClientException;
 import org.apache.flink.table.client.config.Deployment;
 import org.apache.flink.table.client.config.Environment;
 import org.apache.flink.table.client.gateway.SqlExecutionException;
+import org.apache.flink.table.client.gateway.local.result.ChangelogCollectStreamResult;
+import org.apache.flink.table.client.gateway.local.result.DynamicResult;
+import org.apache.flink.table.client.gateway.local.result.MaterializedCollectBatchResult;
+import org.apache.flink.table.client.gateway.local.result.MaterializedCollectStreamResult;
 import org.apache.flink.types.Row;
 
 import java.net.InetAddress;

http://git-wip-us.apache.org/repos/asf/flink/blob/695bc56a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/BasicResult.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/BasicResult.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/BasicResult.java
new file mode 100644
index 0000000..47e0122
--- /dev/null
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/BasicResult.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway.local.result;
+
+/**
+ * Basic result of a table program that has been submitted to a cluster.
+ *
+ * @param <C> cluster id to which this result belongs to
+ */
+public class BasicResult<C> implements Result<C> {
+
+	protected C clusterId;
+	protected String webInterfaceUrl;
+
+	@Override
+	public void setClusterInformation(C clusterId, String webInterfaceUrl) {
+		if (this.clusterId != null || this.webInterfaceUrl != null) {
+			throw new IllegalStateException("Cluster information is already present.");
+		}
+		this.clusterId = clusterId;
+		this.webInterfaceUrl = webInterfaceUrl;
+	}
+
+	public C getClusterId() {
+		if (this.clusterId == null) {
+			throw new IllegalStateException("Cluster ID has not been set.");
+		}
+		return clusterId;
+	}
+
+	public String getWebInterfaceUrl() {
+		if (this.webInterfaceUrl == null) {
+			throw new IllegalStateException("Cluster web interface URL has not been set.");
+		}
+		return webInterfaceUrl;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/695bc56a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/ChangelogCollectStreamResult.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/ChangelogCollectStreamResult.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/ChangelogCollectStreamResult.java
new file mode 100644
index 0000000..4e76888
--- /dev/null
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/ChangelogCollectStreamResult.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway.local.result;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.client.gateway.TypedResult;
+import org.apache.flink.types.Row;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Collects results and returns them as a changelog.
+ *
+ * @param <C> cluster id to which this result belongs to
+ */
+public class ChangelogCollectStreamResult<C> extends CollectStreamResult<C> implements ChangelogResult<C> {
+
+	private List<Tuple2<Boolean, Row>> changeRecordBuffer;
+	private static final int CHANGE_RECORD_BUFFER_SIZE = 5_000;
+
+	public ChangelogCollectStreamResult(TypeInformation<Row> outputType, ExecutionConfig config,
+			InetAddress gatewayAddress, int gatewayPort) {
+		super(outputType, config, gatewayAddress, gatewayPort);
+
+		// prepare for changelog
+		changeRecordBuffer = new ArrayList<>();
+	}
+
+	@Override
+	public boolean isMaterialized() {
+		return false;
+	}
+
+	@Override
+	public TypedResult<List<Tuple2<Boolean, Row>>> retrieveChanges() {
+		synchronized (resultLock) {
+			// retrieval thread is alive return a record if available
+			// but the program must not have failed
+			if (isRetrieving() && executionException == null) {
+				if (changeRecordBuffer.isEmpty()) {
+					return TypedResult.empty();
+				} else {
+					final List<Tuple2<Boolean, Row>> change = new ArrayList<>(changeRecordBuffer);
+					changeRecordBuffer.clear();
+					resultLock.notify();
+					return TypedResult.payload(change);
+				}
+			}
+			// retrieval thread is dead but there is still a record to be delivered
+			else if (!isRetrieving() && !changeRecordBuffer.isEmpty()) {
+				final List<Tuple2<Boolean, Row>> change = new ArrayList<>(changeRecordBuffer);
+				changeRecordBuffer.clear();
+				return TypedResult.payload(change);
+			}
+			// no results can be returned anymore
+			else {
+				return handleMissingResult();
+			}
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	protected void processRecord(Tuple2<Boolean, Row> change) {
+		synchronized (resultLock) {
+			// wait if the buffer is full
+			if (changeRecordBuffer.size() >= CHANGE_RECORD_BUFFER_SIZE) {
+				try {
+					resultLock.wait();
+				} catch (InterruptedException e) {
+					// ignore
+				}
+			} else {
+				changeRecordBuffer.add(change);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/695bc56a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/ChangelogResult.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/ChangelogResult.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/ChangelogResult.java
new file mode 100644
index 0000000..eb6d6dd
--- /dev/null
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/ChangelogResult.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway.local.result;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.client.gateway.TypedResult;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+
+/**
+ * A result that is represented as a changelog consisting of insert and delete records.
+ *
+ * @param <C> cluster id to which this result belongs to
+ */
+public interface ChangelogResult<C> extends DynamicResult<C> {
+
+	/**
+	 * Retrieves the available result records.
+	 */
+	TypedResult<List<Tuple2<Boolean, Row>>> retrieveChanges();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/695bc56a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/CollectStreamResult.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/CollectStreamResult.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/CollectStreamResult.java
new file mode 100644
index 0000000..656057b
--- /dev/null
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/CollectStreamResult.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway.local.result;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.experimental.SocketStreamIterator;
+import org.apache.flink.table.client.SqlClientException;
+import org.apache.flink.table.client.gateway.SqlExecutionException;
+import org.apache.flink.table.client.gateway.TypedResult;
+import org.apache.flink.table.client.gateway.local.CollectStreamTableSink;
+import org.apache.flink.table.client.gateway.local.ProgramDeployer;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+
+import java.io.IOException;
+import java.net.InetAddress;
+
+/**
+ * A result that works similarly to {@link DataStreamUtils#collect(DataStream)}.
+ *
+ * @param <C> cluster id to which this result belongs to
+ */
+public abstract class CollectStreamResult<C> extends BasicResult<C> implements DynamicResult<C> {
+
+	private final TypeInformation<Row> outputType;
+	private final SocketStreamIterator<Tuple2<Boolean, Row>> iterator;
+	private final CollectStreamTableSink collectTableSink;
+	private final ResultRetrievalThread retrievalThread;
+	private final JobMonitoringThread monitoringThread;
+	private ProgramDeployer<C> deployer;
+
+	protected final Object resultLock;
+	protected SqlExecutionException executionException;
+
+	public CollectStreamResult(TypeInformation<Row> outputType, ExecutionConfig config,
+			InetAddress gatewayAddress, int gatewayPort) {
+		this.outputType = outputType;
+
+		resultLock = new Object();
+
+		// create socket stream iterator
+		final TypeInformation<Tuple2<Boolean, Row>> socketType = Types.TUPLE(Types.BOOLEAN, outputType);
+		final TypeSerializer<Tuple2<Boolean, Row>> serializer = socketType.createSerializer(config);
+		try {
+			// pass gateway port and address such that iterator knows where to bind to
+			iterator = new SocketStreamIterator<>(gatewayPort, gatewayAddress, serializer);
+		} catch (IOException e) {
+			throw new SqlClientException("Could not start socket for result retrieval.", e);
+		}
+
+		// create table sink
+		// pass binding address and port such that sink knows where to send to
+		collectTableSink = new CollectStreamTableSink(iterator.getBindAddress(), iterator.getPort(), serializer);
+		retrievalThread = new ResultRetrievalThread();
+		monitoringThread = new JobMonitoringThread();
+	}
+
+	@Override
+	public TypeInformation<Row> getOutputType() {
+		return outputType;
+	}
+
+	@Override
+	public void startRetrieval(ProgramDeployer<C> deployer) {
+		// start listener thread
+		retrievalThread.start();
+
+		// start deployer
+		this.deployer = deployer;
+		monitoringThread.start();
+	}
+
+	@Override
+	public TableSink<?> getTableSink() {
+		return collectTableSink;
+	}
+
+	@Override
+	public void close() {
+		retrievalThread.isRunning = false;
+		retrievalThread.interrupt();
+		monitoringThread.interrupt();
+		iterator.close();
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	protected <T> TypedResult<T> handleMissingResult() {
+		// check if the monitoring thread is still there
+		// we need to wait until we know what is going on
+		if (monitoringThread.isAlive()) {
+			return TypedResult.empty();
+		}
+		// the job finished with an exception
+		else if (executionException != null) {
+			throw executionException;
+		}
+		// we assume that a bounded job finished
+		else {
+			return TypedResult.endOfStream();
+		}
+	}
+
+	protected boolean isRetrieving() {
+		return retrievalThread.isRunning;
+	}
+
+	protected abstract void processRecord(Tuple2<Boolean, Row> change);
+
+	// --------------------------------------------------------------------------------------------
+
+	private class JobMonitoringThread extends Thread {
+
+		@Override
+		public void run() {
+			try {
+				deployer.run();
+			} catch (SqlExecutionException e) {
+				executionException = e;
+			}
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	private class ResultRetrievalThread extends Thread {
+
+		public volatile boolean isRunning = true;
+
+		@Override
+		public void run() {
+			try {
+				while (isRunning && iterator.hasNext()) {
+					final Tuple2<Boolean, Row> change = iterator.next();
+					processRecord(change);
+				}
+			} catch (RuntimeException e) {
+				// ignore socket exceptions
+			}
+
+			// no result anymore
+			// either the job is done or an error occurred
+			isRunning = false;
+		}
+	}
+}