You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/06/11 13:41:58 UTC

flink git commit: [FLINK-8861] [sql-client] Add support for batch queries in SQL Client

Repository: flink
Updated Branches:
  refs/heads/master dff03cac0 -> e8e74a648


[FLINK-8861] [sql-client] Add support for batch queries in SQL Client

This closes #5660.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e8e74a64
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e8e74a64
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e8e74a64

Branch: refs/heads/master
Commit: e8e74a648a134fe054081a49a36b8c45f30c21bc
Parents: dff03ca
Author: Xingcan Cui <xi...@gmail.com>
Authored: Thu Mar 8 01:12:55 2018 +0800
Committer: Timo Walther <tw...@apache.org>
Committed: Mon Jun 11 15:40:34 2018 +0200

----------------------------------------------------------------------
 docs/dev/table/sqlClient.md                     |  20 +--
 .../flink/table/client/config/Execution.java    |  16 +-
 .../gateway/local/CollectBatchTableSink.java    |  82 ++++++++++
 .../gateway/local/CollectStreamResult.java      |  10 +-
 .../gateway/local/CollectStreamTableSink.java   |   2 +-
 .../client/gateway/local/DynamicResult.java     |   6 +-
 .../client/gateway/local/LocalExecutor.java     |  61 +------
 .../local/MaterializedCollectBatchResult.java   | 159 +++++++++++++++++++
 .../client/gateway/local/ProgramDeployer.java   | 123 ++++++++++++++
 .../table/client/gateway/local/ResultStore.java |  31 ++--
 .../gateway/local/LocalExecutorITCase.java      |  84 +++++++---
 .../resources/test-sql-client-defaults.yaml     |   4 +-
 12 files changed, 479 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e8e74a64/docs/dev/table/sqlClient.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sqlClient.md b/docs/dev/table/sqlClient.md
index de7fb65..4c27356 100644
--- a/docs/dev/table/sqlClient.md
+++ b/docs/dev/table/sqlClient.md
@@ -184,13 +184,13 @@ tables:
 # Execution properties allow for changing the behavior of a table program.
 
 execution:
-  type: streaming
-  time-characteristic: event-time
-  parallelism: 1
-  max-parallelism: 16
-  min-idle-state-retention: 0
-  max-idle-state-retention: 0
-  result-mode: table
+  type: streaming                   # required: execution mode either 'batch' or 'streaming'
+  time-characteristic: event-time   # optional: 'processing-time' or 'event-time' (default)
+  parallelism: 1                    # optional: Flink's parallelism (1 by default)
+  max-parallelism: 16               # optional: Flink's maximum parallelism (128 by default)
+  min-idle-state-retention: 0       # optional: table program's minimum idle state time
+  max-idle-state-retention: 0       # optional: table program's maximum idle state time
+  result-mode: table                # required: either 'table' or 'changelog'
 
 # Deployment properties allow for describing the cluster to which table programs are submitted to.
 
@@ -201,8 +201,8 @@ deployment:
 This configuration:
 
 - defines an environment with a table source `MyTableName` that reads from a CSV file,
-- specifies a parallelism of 1 for queries executed in this environment,
-- specifies an even-time characteristic, and
+- specifies a parallelism of 1 for queries executed in this streaming environment,
+- specifies an event-time characteristic, and
 - runs queries in the `table` result mode.
 
 Depending on the use case, a configuration can be split into multiple files. Therefore, environment files can be created for general purposes (*defaults environment file* using `--defaults`) as well as on a per-session basis (*session environment file* using `--environment`). Every CLI session is initialized with the default properties followed by the session properties. For example, the defaults environment file could specify all table sources that should be available for querying in every session whereas the session environment file only declares a specific state retention time and parallelism. Both default and session environment files can be passed when starting the CLI application. If no default environment file has been specified, the SQL Client searches for `./conf/sql-client-defaults.yaml` in Flink's configuration directory.
@@ -213,6 +213,8 @@ Depending on the use case, a configuration can be split into multiple files. The
 CLI commands > session environment file > defaults environment file
 {% endhighlight %}
 
+Queries that are executed in a batch environment, can only be retrieved using the `table` result mode. 
+
 {% top %}
 
 ### Dependencies

http://git-wip-us.apache.org/repos/asf/flink/blob/e8e74a64/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java
index d84c35b..3ca99c1 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java
@@ -46,16 +46,10 @@ public class Execution {
 
 	public boolean isStreamingExecution() {
 		return Objects.equals(
-			properties.getOrDefault(PropertyStrings.EXECUTION_TYPE, PropertyStrings.EXECUTION_TYPE_VALUE_STREAMING),
+			properties.get(PropertyStrings.EXECUTION_TYPE),
 			PropertyStrings.EXECUTION_TYPE_VALUE_STREAMING);
 	}
 
-	public boolean isBatchExecution() {
-		return Objects.equals(
-			properties.getOrDefault(PropertyStrings.EXECUTION_TYPE, PropertyStrings.EXECUTION_TYPE_VALUE_STREAMING),
-			PropertyStrings.EXECUTION_TYPE_VALUE_BATCH);
-	}
-
 	public TimeCharacteristic getTimeCharacteristic() {
 		final String s = properties.getOrDefault(
 			PropertyStrings.EXECUTION_TIME_CHARACTERISTIC,
@@ -88,10 +82,16 @@ public class Execution {
 
 	public boolean isChangelogMode() {
 		return Objects.equals(
-			properties.getOrDefault(PropertyStrings.EXECUTION_RESULT_MODE, PropertyStrings.EXECUTION_RESULT_MODE_VALUE_CHANGELOG),
+			properties.get(PropertyStrings.EXECUTION_RESULT_MODE),
 			PropertyStrings.EXECUTION_RESULT_MODE_VALUE_CHANGELOG);
 	}
 
+	public boolean isTableMode() {
+		return Objects.equals(
+				properties.get(PropertyStrings.EXECUTION_RESULT_MODE),
+				PropertyStrings.EXECUTION_RESULT_MODE_VALUE_TABLE);
+	}
+
 	public Map<String, String> toProperties() {
 		final Map<String, String> copy = new HashMap<>();
 		properties.forEach((k, v) -> copy.put(PropertyStrings.EXECUTION + "." + k, v));

http://git-wip-us.apache.org/repos/asf/flink/blob/e8e74a64/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectBatchTableSink.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectBatchTableSink.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectBatchTableSink.java
new file mode 100644
index 0000000..d299749
--- /dev/null
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectBatchTableSink.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway.local;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.Utils;
+import org.apache.flink.table.sinks.BatchTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+
+/**
+ * Table sink for collecting the results locally all at once using accumulators.
+ */
+public class CollectBatchTableSink implements BatchTableSink<Row> {
+
+	private final String accumulatorName;
+	private final TypeSerializer<Row> serializer;
+
+	private String[] fieldNames;
+	private TypeInformation<?>[] fieldTypes;
+
+	public CollectBatchTableSink(String accumulatorName, TypeSerializer<Row> serializer) {
+		this.accumulatorName = accumulatorName;
+		this.serializer = serializer;
+	}
+
+	@Override
+	public TypeInformation<Row> getOutputType() {
+		return Types.ROW_NAMED(fieldNames, fieldTypes);
+	}
+
+	@Override
+	public String[] getFieldNames() {
+		return fieldNames;
+	}
+
+	@Override
+	public TypeInformation<?>[] getFieldTypes() {
+		return fieldTypes;
+	}
+
+	@Override
+	public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
+		final CollectBatchTableSink copy = new CollectBatchTableSink(accumulatorName, serializer);
+		copy.fieldNames = fieldNames;
+		copy.fieldTypes = fieldTypes;
+		return copy;
+	}
+
+	@Override
+	public void emitDataSet(DataSet<Row> dataSet) {
+		dataSet
+			.output(new Utils.CollectHelper<>(accumulatorName, serializer))
+			.name("SQL Client Batch Collect Sink");
+	}
+
+	/**
+	 * Returns the serializer for deserializing the collected result.
+	 */
+	public TypeSerializer<Row> getSerializer() {
+		return serializer;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e8e74a64/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectStreamResult.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectStreamResult.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectStreamResult.java
index 83f9ff2..b7a0e79 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectStreamResult.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectStreamResult.java
@@ -47,7 +47,7 @@ public abstract class CollectStreamResult<C> implements DynamicResult<C> {
 	private final CollectStreamTableSink collectTableSink;
 	private final ResultRetrievalThread retrievalThread;
 	private final JobMonitoringThread monitoringThread;
-	private Runnable program;
+	private ProgramDeployer<C> deployer;
 	private C clusterId;
 
 	protected final Object resultLock;
@@ -90,12 +90,12 @@ public abstract class CollectStreamResult<C> implements DynamicResult<C> {
 	}
 
 	@Override
-	public void startRetrieval(Runnable program) {
+	public void startRetrieval(ProgramDeployer<C> deployer) {
 		// start listener thread
 		retrievalThread.start();
 
-		// start program
-		this.program = program;
+		// start deployer
+		this.deployer = deployer;
 		monitoringThread.start();
 	}
 
@@ -143,7 +143,7 @@ public abstract class CollectStreamResult<C> implements DynamicResult<C> {
 		@Override
 		public void run() {
 			try {
-				program.run();
+				deployer.run();
 			} catch (SqlExecutionException e) {
 				executionException = e;
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/e8e74a64/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectStreamTableSink.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectStreamTableSink.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectStreamTableSink.java
index c904d8e..c71be18 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectStreamTableSink.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectStreamTableSink.java
@@ -77,7 +77,7 @@ public class CollectStreamTableSink implements RetractStreamTableSink<Row> {
 		// add sink
 		stream
 			.addSink(new CollectSink<>(targetAddress, targetPort, serializer))
-			.name("SQL Client Collect Sink")
+			.name("SQL Client Stream Collect Sink")
 			.setParallelism(1);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e8e74a64/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/DynamicResult.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/DynamicResult.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/DynamicResult.java
index 2042e1a..7c6f8c2 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/DynamicResult.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/DynamicResult.java
@@ -27,7 +27,7 @@ import org.apache.flink.types.Row;
  *
  * <p>Note: Make sure to call close() after the result is not needed anymore.
  *
- * @param <C> cluster id to which this result belongs to
+ * @param <C> type of the cluster id to which this result belongs to
  */
 public interface DynamicResult<C> {
 
@@ -48,9 +48,9 @@ public interface DynamicResult<C> {
 	TypeInformation<Row> getOutputType();
 
 	/**
-	 * Starts the table program using the given runnable and monitors it's execution.
+	 * Starts the table program using the given deployer and monitors it's execution.
 	 */
-	void startRetrieval(Runnable program);
+	void startRetrieval(ProgramDeployer<C> deployer);
 
 	/**
 	 * Returns the table sink required by this result type.

http://git-wip-us.apache.org/repos/asf/flink/blob/e8e74a64/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index 0329648..bd463ac 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -26,7 +26,6 @@ import org.apache.flink.client.cli.CustomCommandLine;
 import org.apache.flink.client.deployment.ClusterDescriptor;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.JobWithJars;
-import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.core.fs.FileSystem;
@@ -353,17 +352,10 @@ public class LocalExecutor implements Executor {
 		resultStore.storeResult(resultId, result);
 
 		// create execution
-		final Runnable program = () -> {
-			LOG.info("Submitting job {} for query {}`", jobGraph.getJobID(), jobName);
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Submitting job {} with the following environment: \n{}",
-					jobGraph.getJobID(), context.getMergedEnvironment());
-			}
-			deployJob(context, jobGraph, result);
-		};
+		final ProgramDeployer<T> deployer = new ProgramDeployer<>(context, jobName, jobGraph, result);
 
 		// start result retrieval
-		result.startRetrieval(program);
+		result.startRetrieval(deployer);
 
 		return new ResultDescriptor(
 			resultId,
@@ -397,55 +389,6 @@ public class LocalExecutor implements Executor {
 		return executionContext;
 	}
 
-	/**
-	 * Deploys a job. Depending on the deployment create a new job cluster. It saves cluster id in
-	 * the result and blocks until job completion.
-	 */
-	private <T> void deployJob(ExecutionContext<T> context, JobGraph jobGraph, DynamicResult<T> result) {
-		// create or retrieve cluster and deploy job
-		try (final ClusterDescriptor<T> clusterDescriptor = context.createClusterDescriptor()) {
-			ClusterClient<T> clusterClient = null;
-			try {
-				// new cluster
-				if (context.getClusterId() == null) {
-					// deploy job cluster with job attached
-					clusterClient = clusterDescriptor.deployJobCluster(context.getClusterSpec(), jobGraph, false);
-					// save the new cluster id
-					result.setClusterId(clusterClient.getClusterId());
-					// we need to hard cast for now
-					((RestClusterClient<T>) clusterClient)
-						.requestJobResult(jobGraph.getJobID())
-						.get()
-						.toJobExecutionResult(context.getClassLoader()); // throws exception if job fails
-				}
-				// reuse existing cluster
-				else {
-					// retrieve existing cluster
-					clusterClient = clusterDescriptor.retrieve(context.getClusterId());
-					// save the cluster id
-					result.setClusterId(clusterClient.getClusterId());
-					// submit the job
-					clusterClient.setDetached(false);
-					clusterClient.submitJob(jobGraph, context.getClassLoader()); // throws exception if job fails
-				}
-			} catch (Exception e) {
-				throw new SqlExecutionException("Could not retrieve or create a cluster.", e);
-			} finally {
-				try {
-					if (clusterClient != null) {
-						clusterClient.shutdown();
-					}
-				} catch (Exception e) {
-					// ignore
-				}
-			}
-		} catch (SqlExecutionException e) {
-			throw e;
-		} catch (Exception e) {
-			throw new SqlExecutionException("Could not locate a cluster.", e);
-		}
-	}
-
 	// --------------------------------------------------------------------------------------------
 
 	private static List<URL> discoverDependencies(List<URL> jars, List<URL> libraries) {

http://git-wip-us.apache.org/repos/asf/flink/blob/e8e74a64/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/MaterializedCollectBatchResult.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/MaterializedCollectBatchResult.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/MaterializedCollectBatchResult.java
new file mode 100644
index 0000000..3bdc1fa
--- /dev/null
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/MaterializedCollectBatchResult.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway.local;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.SerializedListAccumulator;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.client.gateway.SqlExecutionException;
+import org.apache.flink.table.client.gateway.TypedResult;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.AbstractID;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Collects results using accumulators and returns them as table snapshots.
+ */
+public class MaterializedCollectBatchResult<C> implements MaterializedResult<C> {
+
+	private final TypeInformation<Row> outputType;
+	private final String accumulatorName;
+	private final CollectBatchTableSink tableSink;
+	private final Object resultLock;
+	private final Thread retrievalThread;
+
+	private ProgramDeployer<C> deployer;
+	private C clusterId;
+	private int pageSize;
+	private int pageCount;
+	private SqlExecutionException executionException;
+	private List<Row> resultTable;
+
+	private volatile boolean snapshotted = false;
+
+	public MaterializedCollectBatchResult(TypeInformation<Row> outputType, ExecutionConfig config) {
+		this.outputType = outputType;
+
+		accumulatorName = new AbstractID().toString();
+		tableSink = new CollectBatchTableSink(accumulatorName, outputType.createSerializer(config));
+		resultLock = new Object();
+		retrievalThread = new ResultRetrievalThread();
+
+		pageCount = 0;
+	}
+
+	@Override
+	public void setClusterId(C clusterId) {
+		if (this.clusterId != null) {
+			throw new IllegalStateException("Cluster id is already present.");
+		}
+		this.clusterId = clusterId;
+	}
+
+	@Override
+	public boolean isMaterialized() {
+		return true;
+	}
+
+	@Override
+	public TypeInformation<Row> getOutputType() {
+		return outputType;
+	}
+
+	@Override
+	public void startRetrieval(ProgramDeployer<C> deployer) {
+		this.deployer = deployer;
+		retrievalThread.start();
+	}
+
+	@Override
+	public TableSink<?> getTableSink() {
+		return tableSink;
+	}
+
+	@Override
+	public void close() {
+		retrievalThread.interrupt();
+	}
+
+	@Override
+	public List<Row> retrievePage(int page) {
+		synchronized (resultLock) {
+			if (page <= 0 || page > pageCount) {
+				throw new SqlExecutionException("Invalid page '" + page + "'.");
+			}
+			return resultTable.subList(pageSize * (page - 1), Math.min(resultTable.size(), page * pageSize));
+		}
+	}
+
+	@Override
+	public TypedResult<Integer> snapshot(int pageSize) {
+		synchronized (resultLock) {
+			// wait for a result
+			if (retrievalThread.isAlive() && null == resultTable) {
+				return TypedResult.empty();
+			}
+			// the job finished with an exception
+			else if (executionException != null) {
+				throw executionException;
+			}
+			// we return a payload result the first time and EoS for the rest of times as if the results
+			// are retrieved dynamically
+			else if (!snapshotted) {
+				snapshotted = true;
+				this.pageSize = pageSize;
+				pageCount = Math.max(1, (int) Math.ceil(((double) resultTable.size() / pageSize)));
+				return TypedResult.payload(pageCount);
+			} else {
+				return TypedResult.endOfStream();
+			}
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	private class ResultRetrievalThread extends Thread {
+
+		@Override
+		public void run() {
+			try {
+				deployer.run();
+				final JobExecutionResult result = deployer.fetchExecutionResult();
+				final ArrayList<byte[]> accResult = result.getAccumulatorResult(accumulatorName);
+				if (accResult == null) {
+					throw new SqlExecutionException("The accumulator could not retrieve the result.");
+				}
+				final List<Row> resultTable = SerializedListAccumulator.deserializeList(accResult, tableSink.getSerializer());
+				// sets the result table all at once
+				synchronized (resultLock) {
+					MaterializedCollectBatchResult.this.resultTable = resultTable;
+				}
+			} catch (ClassNotFoundException | IOException e) {
+				executionException = new SqlExecutionException("Serialization error while deserializing collected data.", e);
+			} catch (SqlExecutionException e) {
+				executionException = e;
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e8e74a64/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ProgramDeployer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ProgramDeployer.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ProgramDeployer.java
new file mode 100644
index 0000000..5c5cf98
--- /dev/null
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ProgramDeployer.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway.local;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.client.deployment.ClusterDescriptor;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.table.client.gateway.SqlExecutionException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+
+/**
+ * The helper class to deploy a table program on the cluster.
+ */
+public class ProgramDeployer<C> implements Runnable {
+	private static final Logger LOG = LoggerFactory.getLogger(ProgramDeployer.class);
+
+	private final ExecutionContext<C> context;
+	private final JobGraph jobGraph;
+	private final String jobName;
+	private final DynamicResult<C> result;
+	private final BlockingQueue<JobExecutionResult> executionResultBucket;
+
+	public ProgramDeployer(
+			ExecutionContext<C> context,
+			String jobName,
+			JobGraph jobGraph,
+			DynamicResult<C> result) {
+		this.context = context;
+		this.jobGraph = jobGraph;
+		this.jobName = jobName;
+		this.result = result;
+		executionResultBucket = new LinkedBlockingDeque<>(1);
+	}
+
+	@Override
+	public void run() {
+		LOG.info("Submitting job {} for query {}`", jobGraph.getJobID(), jobName);
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Submitting job {} with the following environment: \n{}",
+					jobGraph.getJobID(), context.getMergedEnvironment());
+		}
+		executionResultBucket.add(deployJob(context, jobGraph, result));
+	}
+
+	public JobExecutionResult fetchExecutionResult() {
+		return executionResultBucket.poll();
+	}
+
+	/**
+	 * Deploys a job. Depending on the deployment creates a new job cluster. It saves the cluster id in
+	 * the result and blocks until job completion.
+	 */
+	private <T> JobExecutionResult deployJob(ExecutionContext<T> context, JobGraph jobGraph, DynamicResult<T> result) {
+		// create or retrieve cluster and deploy job
+		try (final ClusterDescriptor<T> clusterDescriptor = context.createClusterDescriptor()) {
+			ClusterClient<T> clusterClient = null;
+			try {
+				// new cluster
+				if (context.getClusterId() == null) {
+					// deploy job cluster with job attached
+					clusterClient = clusterDescriptor.deployJobCluster(context.getClusterSpec(), jobGraph, false);
+					// save the new cluster id
+					result.setClusterId(clusterClient.getClusterId());
+					// we need to hard cast for now
+					return ((RestClusterClient<T>) clusterClient)
+							.requestJobResult(jobGraph.getJobID())
+							.get()
+							.toJobExecutionResult(context.getClassLoader()); // throws exception if job fails
+				}
+				// reuse existing cluster
+				else {
+					// retrieve existing cluster
+					clusterClient = clusterDescriptor.retrieve(context.getClusterId());
+					// save the cluster id
+					result.setClusterId(clusterClient.getClusterId());
+					// submit the job
+					clusterClient.setDetached(false);
+					return clusterClient
+						.submitJob(jobGraph, context.getClassLoader())
+						.getJobExecutionResult(); // throws exception if job fails
+				}
+			} catch (Exception e) {
+				throw new SqlExecutionException("Could not retrieve or create a cluster.", e);
+			} finally {
+				try {
+					if (clusterClient != null) {
+						clusterClient.shutdown();
+					}
+				} catch (Exception e) {
+					// ignore
+				}
+			}
+		} catch (SqlExecutionException e) {
+			throw e;
+		} catch (Exception e) {
+			throw new SqlExecutionException("Could not locate a cluster.", e);
+		}
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/e8e74a64/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java
index 7e17ee7..f70378c 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java
@@ -44,13 +44,11 @@ import java.util.Map;
  */
 public class ResultStore {
 
-	private Configuration flinkConfig;
-
-	private Map<String, DynamicResult<?>> results;
+	private final Configuration flinkConfig;
+	private final Map<String, DynamicResult<?>> results;
 
 	public ResultStore(Configuration flinkConfig) {
 		this.flinkConfig = flinkConfig;
-
 		results = new HashMap<>();
 	}
 
@@ -58,19 +56,26 @@ public class ResultStore {
 	 * Creates a result. Might start threads or opens sockets so every created result must be closed.
 	 */
 	public <T> DynamicResult<T> createResult(Environment env, TableSchema schema, ExecutionConfig config) {
-		if (!env.getExecution().isStreamingExecution()) {
-			throw new SqlExecutionException("Emission is only supported in streaming environments yet.");
-		}
 
 		final TypeInformation<Row> outputType = Types.ROW_NAMED(schema.getColumnNames(), schema.getTypes());
-		// determine gateway address (and port if possible)
-		final InetAddress gatewayAddress = getGatewayAddress(env.getDeployment());
-		final int gatewayPort = getGatewayPort(env.getDeployment());
 
-		if (env.getExecution().isChangelogMode()) {
-			return new ChangelogCollectStreamResult<>(outputType, config, gatewayAddress, gatewayPort);
+		if (env.getExecution().isStreamingExecution()) {
+			// determine gateway address (and port if possible)
+			final InetAddress gatewayAddress = getGatewayAddress(env.getDeployment());
+			final int gatewayPort = getGatewayPort(env.getDeployment());
+
+			if (env.getExecution().isChangelogMode()) {
+				return new ChangelogCollectStreamResult<>(outputType, config, gatewayAddress, gatewayPort);
+			} else {
+				return new MaterializedCollectStreamResult<>(outputType, config, gatewayAddress, gatewayPort);
+			}
+
 		} else {
-			return new MaterializedCollectStreamResult<>(outputType, config, gatewayAddress, gatewayPort);
+			// Batch Execution
+			if (!env.getExecution().isTableMode()) {
+				throw new SqlExecutionException("Results of batch queries can only be served in table mode.");
+			}
+			return new MaterializedCollectBatchResult<>(outputType, config);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e8e74a64/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index 48cc648..64b526d 100644
--- a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -114,6 +114,7 @@ public class LocalExecutorITCase extends TestLogger {
 		executor.getSessionProperties(session);
 
 		// modify defaults
+		session.setSessionProperty("execution.type", "streaming");
 		session.setSessionProperty("execution.result-mode", "table");
 
 		final Map<String, String> actualProperties = executor.getSessionProperties(session);
@@ -146,13 +147,14 @@ public class LocalExecutorITCase extends TestLogger {
 	}
 
 	@Test(timeout = 30_000L)
-	public void testQueryExecutionChangelog() throws Exception {
+	public void testStreamQueryExecutionChangelog() throws Exception {
 		final URL url = getClass().getClassLoader().getResource("test-data.csv");
 		Objects.requireNonNull(url);
 		final Map<String, String> replaceVars = new HashMap<>();
 		replaceVars.put("$VAR_0", url.getPath());
 		replaceVars.put("$VAR_1", "/");
-		replaceVars.put("$VAR_2", "changelog");
+		replaceVars.put("$VAR_2", "streaming");
+		replaceVars.put("$VAR_3", "changelog");
 
 		final Executor executor = createModifiedExecutor(clusterClient, replaceVars);
 		final SessionContext session = new SessionContext("test-session", new Environment());
@@ -193,13 +195,14 @@ public class LocalExecutorITCase extends TestLogger {
 	}
 
 	@Test(timeout = 30_000L)
-	public void testQueryExecutionTable() throws Exception {
+	public void testStreamQueryExecutionTable() throws Exception {
 		final URL url = getClass().getClassLoader().getResource("test-data.csv");
 		Objects.requireNonNull(url);
 		final Map<String, String> replaceVars = new HashMap<>();
 		replaceVars.put("$VAR_0", url.getPath());
 		replaceVars.put("$VAR_1", "/");
-		replaceVars.put("$VAR_2", "table");
+		replaceVars.put("$VAR_2", "streaming");
+		replaceVars.put("$VAR_3", "table");
 
 		final Executor executor = createModifiedExecutor(clusterClient, replaceVars);
 		final SessionContext session = new SessionContext("test-session", new Environment());
@@ -210,22 +213,41 @@ public class LocalExecutorITCase extends TestLogger {
 
 			assertTrue(desc.isMaterialized());
 
-			final List<String> actualResults = new ArrayList<>();
+			final List<String> actualResults = retrieveTableResult(executor, session, desc.getResultId());
 
-			while (true) {
-				Thread.sleep(50); // slow the processing down
-				final TypedResult<Integer> result = executor.snapshotResult(session, desc.getResultId(), 2);
-				if (result.getType() == TypedResult.ResultType.PAYLOAD) {
-					actualResults.clear();
-					IntStream.rangeClosed(1, result.getPayload()).forEach((page) -> {
-						for (Row row : executor.retrieveResultPage(desc.getResultId(), page)) {
-							actualResults.add(row.toString());
-						}
-					});
-				} else if (result.getType() == TypedResult.ResultType.EOS) {
-					break;
-				}
-			}
+			final List<String> expectedResults = new ArrayList<>();
+			expectedResults.add("42");
+			expectedResults.add("22");
+			expectedResults.add("32");
+			expectedResults.add("32");
+			expectedResults.add("42");
+			expectedResults.add("52");
+
+			TestBaseUtils.compareResultCollections(expectedResults, actualResults, Comparator.naturalOrder());
+		} finally {
+			executor.stop(session);
+		}
+	}
+
+	@Test(timeout = 30_000L)
+	public void testBatchQueryExecution() throws Exception {
+		final URL url = getClass().getClassLoader().getResource("test-data.csv");
+		Objects.requireNonNull(url);
+		final Map<String, String> replaceVars = new HashMap<>();
+		replaceVars.put("$VAR_0", url.getPath());
+		replaceVars.put("$VAR_1", "/");
+		replaceVars.put("$VAR_2", "batch");
+		replaceVars.put("$VAR_3", "table");
+
+		final Executor executor = createModifiedExecutor(clusterClient, replaceVars);
+		final SessionContext session = new SessionContext("test-session", new Environment());
+
+		try {
+			final ResultDescriptor desc = executor.executeQuery(session, "SELECT IntegerField1 FROM TableNumber1");
+
+			assertTrue(desc.isMaterialized());
+
+			final List<String> actualResults = retrieveTableResult(executor, session, desc.getResultId());
 
 			final List<String> expectedResults = new ArrayList<>();
 			expectedResults.add("42");
@@ -256,4 +278,28 @@ public class LocalExecutorITCase extends TestLogger {
 			clusterClient.getFlinkConfiguration(),
 			new DummyCustomCommandLine<T>(clusterClient));
 	}
+
+	private List<String> retrieveTableResult(
+			Executor executor,
+			SessionContext session,
+			String resultID) throws InterruptedException {
+
+		final List<String> actualResults = new ArrayList<>();
+		while (true) {
+			Thread.sleep(50); // slow the processing down
+			final TypedResult<Integer> result = executor.snapshotResult(session, resultID, 2);
+			if (result.getType() == TypedResult.ResultType.PAYLOAD) {
+				actualResults.clear();
+				IntStream.rangeClosed(1, result.getPayload()).forEach((page) -> {
+					for (Row row : executor.retrieveResultPage(resultID, page)) {
+						actualResults.add(row.toString());
+					}
+				});
+			} else if (result.getType() == TypedResult.ResultType.EOS) {
+				break;
+			}
+		}
+
+		return actualResults;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e8e74a64/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
index 1186615..3c21ffb 100644
--- a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
+++ b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
@@ -64,13 +64,13 @@ tables:
       comment-prefix: "#"
 
 execution:
-  type: streaming
+  type: "$VAR_2"
   time-characteristic: event-time
   parallelism: 1
   max-parallelism: 16
   min-idle-state-retention: 0
   max-idle-state-retention: 0
-  result-mode: "$VAR_2"
+  result-mode: "$VAR_3"
 
 deployment:
   response-timeout: 5000