You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2019/07/05 13:34:35 UTC
[flink] 01/03: [FLINK-13088][table-api] Support lazy query
transformation & execution on TableEnvironment
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 177e3a69a701ff2dc217b039e06f008af7d75d43
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Mon Jun 17 11:44:08 2019 +0200
[FLINK-13088][table-api] Support lazy query transformation & execution on TableEnvironment
---
flink-python/pyflink/table/table_environment.py | 47 +++++++++++
.../table/tests/test_environment_completeness.py | 3 +-
.../table/client/gateway/local/LocalExecutor.java | 8 +-
.../table/api/java/BatchTableEnvironment.java | 41 ++++++++++
.../table/api/java/StreamTableEnvironment.java | 61 ++++++++++++++
.../java/internal/StreamTableEnvironmentImpl.java | 38 ++++++---
.../apache/flink/table/api/BatchQueryConfig.java | 3 +
.../org/apache/flink/table/api/QueryConfig.java | 3 +
.../apache/flink/table/api/StreamQueryConfig.java | 13 +++
.../org/apache/flink/table/api/TableConfig.java | 58 +++++++++++++
.../apache/flink/table/api/TableEnvironment.java | 61 +++++---------
.../table/api/internal/QueryConfigProvider.java | 41 ----------
.../table/api/internal/TableEnvironmentImpl.java | 95 ++++++++++++++--------
.../apache/flink/table/api/internal/TableImpl.java | 11 ++-
.../table/api/scala/BatchTableEnvironment.scala | 62 ++++++++++++++
.../table/api/scala/StreamTableEnvironment.scala | 63 ++++++++++++++
.../internal/StreamTableEnvironmentImpl.scala | 31 ++++++-
.../table/api/internal/BatchTableEnvImpl.scala | 37 +++------
.../flink/table/api/internal/TableEnvImpl.scala | 30 ++-----
.../java/internal/BatchTableEnvironmentImpl.scala | 18 +++-
.../scala/internal/BatchTableEnvironmentImpl.scala | 12 ++-
.../apache/flink/table/planner/StreamPlanner.scala | 25 +++---
.../sql/validation/InsertIntoValidationTest.scala | 2 +-
.../harness/GroupAggregateHarnessTest.scala | 18 ++--
.../harness/TableAggregateHarnessTest.scala | 28 ++++---
.../flink/table/utils/MockTableEnvironment.scala | 9 +-
26 files changed, 593 insertions(+), 225 deletions(-)
diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py
index 19b5199..3c8b44c 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -446,6 +446,25 @@ class TableEnvironment(object):
"""
pass
+ def execute(self, job_name):
+ """
+ Triggers the program execution. The environment will execute all parts of
+ the program.
+
+ <p>The program execution will be logged and displayed with the provided name
+
+ <p><b>NOTE:</b>It is highly advised to set all parameters in the :class:`TableConfig`
+ on the very beginning of the program. It is undefined what configurations values will
+ be used for the execution if queries are mixed with config changes. It depends on
+ the characteristic of the particular parameter. For some of them the value from the
+ point in time of query construction (e.g. the currentCatalog) will be used. On the
+ other hand some values might be evaluated according to the state from the time when
+ this method is called (e.g. timeZone).
+
+ :param job_name Desired name of the job
+ """
+ self._j_tenv.execute(job_name)
+
def from_elements(self, elements, schema=None, verify_schema=True):
"""
Creates a table from a collection of elements.
@@ -623,6 +642,20 @@ class StreamTableEnvironment(TableEnvironment):
return StreamTableDescriptor(
self._j_tenv.connect(connector_descriptor._j_connector_descriptor))
+ def execute(self, job_name):
+ """
+ Triggers the program execution. The environment will execute all parts of
+ the program.
+
+ The program execution will be logged and displayed with the provided name
+
+ It calls the StreamExecutionEnvironment#execute on the underlying
+ :class:`StreamExecutionEnvironment`. This environment translates queries eagerly.
+
+ :param job_name Desired name of the job
+ """
+ self._j_tenv.execute(job_name)
+
@staticmethod
def create(stream_execution_environment, table_config=None):
"""
@@ -710,6 +743,20 @@ class BatchTableEnvironment(TableEnvironment):
return BatchTableDescriptor(
self._j_tenv.connect(connector_descriptor._j_connector_descriptor))
+ def execute(self, job_name):
+ """
+ Triggers the program execution. The environment will execute all parts of
+ the program.
+
+ The program execution will be logged and displayed with the provided name
+
+ It calls the ExecutionEnvironment#execute on the underlying
+ :class:`ExecutionEnvironment`. This environment translates queries eagerly.
+
+ :param job_name Desired name of the job
+ """
+ self._j_tenv.execute(job_name)
+
@staticmethod
def create(execution_environment, table_config=None):
"""
diff --git a/flink-python/pyflink/table/tests/test_environment_completeness.py b/flink-python/pyflink/table/tests/test_environment_completeness.py
index a8fc991..1f82445 100644
--- a/flink-python/pyflink/table/tests/test_environment_completeness.py
+++ b/flink-python/pyflink/table/tests/test_environment_completeness.py
@@ -41,9 +41,10 @@ class EnvironmentAPICompletenessTests(PythonAPICompletenessTestCase, unittest.Te
# registerExternalCatalog, getRegisteredExternalCatalog, registerCatalog, getCatalog and
# listTables should be supported when catalog supported in python.
# getCompletionHints has been deprecated. It will be removed in the next release.
+ # TODO add TableEnvironment#create method with EnvironmentSettings as a parameter
return {'registerExternalCatalog', 'getRegisteredExternalCatalog', 'registerCatalog',
'getCatalog', 'registerFunction', 'listUserDefinedFunctions', 'listTables',
- 'getCompletionHints'}
+ 'getCompletionHints', 'create'}
if __name__ == '__main__':
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index 5eabbfe..af128ef 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -35,10 +35,12 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.core.plugin.PluginUtils;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.table.api.QueryConfig;
+import org.apache.flink.table.api.StreamQueryConfig;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.internal.TableEnvImpl;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.calcite.FlinkTypeFactory;
import org.apache.flink.table.client.SqlClientException;
import org.apache.flink.table.client.config.Environment;
@@ -509,7 +511,11 @@ public class LocalExecutor implements Executor {
// parse and validate statement
try {
context.wrapClassLoader(() -> {
- tableEnv.sqlUpdate(updateStatement, queryConfig);
+ if (tableEnv instanceof StreamTableEnvironment) {
+ ((StreamTableEnvironment) tableEnv).sqlUpdate(updateStatement, (StreamQueryConfig) queryConfig);
+ } else {
+ tableEnv.sqlUpdate(updateStatement);
+ }
return null;
});
} catch (Throwable t) {
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/BatchTableEnvironment.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/BatchTableEnvironment.java
index 16f63af..7b7692d 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/BatchTableEnvironment.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/BatchTableEnvironment.java
@@ -33,6 +33,7 @@ import org.apache.flink.table.descriptors.BatchTableDescriptor;
import org.apache.flink.table.descriptors.ConnectorDescriptor;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.sinks.TableSink;
import java.lang.reflect.Constructor;
@@ -211,6 +212,46 @@ public interface BatchTableEnvironment extends TableEnvironment {
<T> DataSet<T> toDataSet(Table table, TypeInformation<T> typeInfo, BatchQueryConfig queryConfig);
/**
+ * Evaluates a SQL statement such as INSERT, UPDATE or DELETE; or a DDL statement;
+ * NOTE: Currently only SQL INSERT statements are supported.
+ *
+ * <p>All tables referenced by the query must be registered in the TableEnvironment.
+ * A {@link Table} is automatically registered when its {@link Table#toString()} method is
+ * called, for example when it is embedded into a String.
+ * Hence, SQL queries can directly reference a {@link Table} as follows:
+ *
+ * <pre>
+ * {@code
+ * // register the configured table sink into which the result is inserted.
+ * tEnv.registerTableSink("sinkTable", configuredSink);
+ * Table sourceTable = ...
+ * String tableName = sourceTable.toString();
+ * // sourceTable is not registered to the table environment
+ * tEnv.sqlUpdate(s"INSERT INTO sinkTable SELECT * FROM tableName", config);
+ * }
+ * </pre>
+ *
+ * @param stmt The SQL statement to evaluate.
+ * @param config The {@link BatchQueryConfig} to use.
+ */
+ void sqlUpdate(String stmt, BatchQueryConfig config);
+
+ /**
+ * Writes the {@link Table} to a {@link TableSink} that was registered under the specified name.
+ *
+ * <p>See the documentation of {@link TableEnvironment#useDatabase(String)} or
+ * {@link TableEnvironment#useCatalog(String)} for the rules on the path resolution.
+ *
+ * @param table The Table to write to the sink.
+ * @param queryConfig The {@link BatchQueryConfig} to use.
+ * @param sinkPath The first part of the path of the registered {@link TableSink} to which the {@link Table} is
+ * written. This is to ensure at least the name of the {@link TableSink} is provided.
+ * @param sinkPathContinued The remaining part of the path of the registered {@link TableSink} to which the
+ * {@link Table} is written.
+ */
+ void insertInto(Table table, BatchQueryConfig queryConfig, String sinkPath, String... sinkPathContinued);
+
+ /**
* Creates a table source and/or table sink from a descriptor.
*
* <p>Descriptors allow for declaring the communication to external systems in an
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/StreamTableEnvironment.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/StreamTableEnvironment.java
index 6859780..a18f3d4 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/StreamTableEnvironment.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/StreamTableEnvironment.java
@@ -19,11 +19,13 @@
package org.apache.flink.table.api.java;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.QueryConfig;
import org.apache.flink.table.api.StreamQueryConfig;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig;
@@ -34,6 +36,7 @@ import org.apache.flink.table.descriptors.StreamTableDescriptor;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.sinks.TableSink;
/**
* This table environment is the entry point and central context for creating Table & SQL
@@ -448,4 +451,62 @@ public interface StreamTableEnvironment extends TableEnvironment {
*/
@Override
StreamTableDescriptor connect(ConnectorDescriptor connectorDescriptor);
+
+ /**
+ * Evaluates a SQL statement such as INSERT, UPDATE or DELETE; or a DDL statement;
+ * NOTE: Currently only SQL INSERT statements are supported.
+ *
+ * <p>All tables referenced by the query must be registered in the TableEnvironment.
+ * A {@link Table} is automatically registered when its {@link Table#toString()} method is
+ * called, for example when it is embedded into a String.
+ * Hence, SQL queries can directly reference a {@link Table} as follows:
+ *
+ * <pre>
+ * {@code
+ * // register the configured table sink into which the result is inserted.
+ * tEnv.registerTableSink("sinkTable", configuredSink);
+ * Table sourceTable = ...
+ * String tableName = sourceTable.toString();
+ * // sourceTable is not registered to the table environment
+ * tEnv.sqlUpdate(s"INSERT INTO sinkTable SELECT * FROM tableName", config);
+ * }
+ * </pre>
+ *
+ * @param stmt The SQL statement to evaluate.
+ * @param config The {@link QueryConfig} to use.
+ */
+ void sqlUpdate(String stmt, StreamQueryConfig config);
+
+ /**
+ * Writes the {@link Table} to a {@link TableSink} that was registered under the specified name.
+ *
+ * <p>See the documentation of {@link TableEnvironment#useDatabase(String)} or
+ * {@link TableEnvironment#useCatalog(String)} for the rules on the path resolution.
+ *
+ * @param table The Table to write to the sink.
+ * @param queryConfig The {@link StreamQueryConfig} to use.
+ * @param sinkPath The first part of the path of the registered {@link TableSink} to which the {@link Table} is
+ * written. This is to ensure at least the name of the {@link TableSink} is provided.
+ * @param sinkPathContinued The remaining part of the path of the registered {@link TableSink} to which the
+ * {@link Table} is written.
+ */
+ void insertInto(Table table, StreamQueryConfig queryConfig, String sinkPath, String... sinkPathContinued);
+
+ /**
+ * Triggers the program execution. The environment will execute all parts of
+ * the program.
+ *
+ * <p>The program execution will be logged and displayed with the provided name
+ *
+ * <p>It calls the {@link StreamExecutionEnvironment#execute(String)} on the underlying
+ * {@link StreamExecutionEnvironment}. In contrast to the {@link TableEnvironment} this
+ * environment translates queries eagerly. Therefore the values in {@link QueryConfig}
+ * parameter are ignored.
+ *
+ * @param jobName Desired name of the job
+ * @return The result of the job execution, containing elapsed time and accumulators.
+ * @throws Exception which occurs during job execution.
+ */
+ @Override
+ JobExecutionResult execute(String jobName) throws Exception;
}
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
index 05815dd..4dde3d6 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.api.java.internal;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -91,14 +92,6 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple
this.executionEnvironment = executionEnvironment;
}
- /**
- * Creates an instance of a {@link StreamTableEnvironment}. It uses the {@link StreamExecutionEnvironment} for
- * executing queries. This is also the {@link StreamExecutionEnvironment} that will be used when converting
- * from/to {@link DataStream}.
- *
- * @param tableConfig The configuration of the TableEnvironment.
- * @param executionEnvironment The {@link StreamExecutionEnvironment} of the TableEnvironment.
- */
public static StreamTableEnvironment create(
StreamExecutionEnvironment executionEnvironment,
EnvironmentSettings settings,
@@ -241,7 +234,9 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple
table.getQueryOperation(),
TypeConversions.fromLegacyInfoToDataType(typeInfo),
OutputConversionModifyOperation.UpdateMode.APPEND);
- queryConfigProvider.setConfig(queryConfig);
+ tableConfig.setIdleStateRetentionTime(
+ Time.milliseconds(queryConfig.getMinIdleStateRetentionTime()),
+ Time.milliseconds(queryConfig.getMaxIdleStateRetentionTime()));
return toDataStream(table, modifyOperation);
}
@@ -273,7 +268,9 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple
table.getQueryOperation(),
wrapWithChangeFlag(typeInfo),
OutputConversionModifyOperation.UpdateMode.RETRACT);
- queryConfigProvider.setConfig(queryConfig);
+ tableConfig.setIdleStateRetentionTime(
+ Time.milliseconds(queryConfig.getMinIdleStateRetentionTime()),
+ Time.milliseconds(queryConfig.getMaxIdleStateRetentionTime()));
return toDataStream(table, modifyOperation);
}
@@ -282,6 +279,22 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple
return (StreamTableDescriptor) super.connect(connectorDescriptor);
}
+ @Override
+ public void sqlUpdate(String stmt, StreamQueryConfig config) {
+ tableConfig.setIdleStateRetentionTime(
+ Time.milliseconds(config.getMinIdleStateRetentionTime()),
+ Time.milliseconds(config.getMaxIdleStateRetentionTime()));
+ sqlUpdate(stmt);
+ }
+
+ @Override
+ public void insertInto(Table table, StreamQueryConfig queryConfig, String sinkPath, String... sinkPathContinued) {
+ tableConfig.setIdleStateRetentionTime(
+ Time.milliseconds(queryConfig.getMinIdleStateRetentionTime()),
+ Time.milliseconds(queryConfig.getMaxIdleStateRetentionTime()));
+ insertInto(table, sinkPath, sinkPathContinued);
+ }
+
/**
* This is a temporary workaround for Python API. Python API should not use StreamExecutionEnvironment at all.
*/
@@ -305,6 +318,11 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple
validateTimeCharacteristic(TableSourceValidation.hasRowtimeAttribute(tableSource));
}
+ @Override
+ protected boolean isEagerOperationTranslation() {
+ return true;
+ }
+
private <T> TypeInformation<T> extractTypeInformation(Table table, Class<T> clazz) {
try {
return TypeExtractor.createTypeInfo(clazz);
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/BatchQueryConfig.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/BatchQueryConfig.java
index a7cac52..c70d948 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/BatchQueryConfig.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/BatchQueryConfig.java
@@ -22,7 +22,10 @@ import org.apache.flink.annotation.PublicEvolving;
/**
* The {@link BatchQueryConfig} holds parameters to configure the behavior of batch queries.
+ *
+ * @deprecated Set the configuration on {@link TableConfig}.
*/
+@Deprecated
@PublicEvolving
public class BatchQueryConfig implements QueryConfig {
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/QueryConfig.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/QueryConfig.java
index 99cdb41..d330b45 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/QueryConfig.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/QueryConfig.java
@@ -24,8 +24,11 @@ import java.io.Serializable;
/**
* The {@link QueryConfig} holds parameters to configure the behavior of queries.
+ *
+ * @deprecated Set the configuration on {@link TableConfig}.
*/
@PublicEvolving
+@Deprecated
public interface QueryConfig extends Serializable {
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/StreamQueryConfig.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/StreamQueryConfig.java
index 86c0c75..8c774ff 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/StreamQueryConfig.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/StreamQueryConfig.java
@@ -18,12 +18,16 @@
package org.apache.flink.table.api;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.time.Time;
/**
* The {@link StreamQueryConfig} holds parameters to configure the behavior of streaming queries.
+ *
+ * @deprecated Set the configuration on {@link TableConfig}.
*/
+@Deprecated
@PublicEvolving
public class StreamQueryConfig implements QueryConfig {
@@ -39,6 +43,15 @@ public class StreamQueryConfig implements QueryConfig {
*/
private long maxIdleStateRetentionTime = 0L;
+ @Internal
+ public StreamQueryConfig(long minIdleStateRetentionTime, long maxIdleStateRetentionTime) {
+ this.minIdleStateRetentionTime = minIdleStateRetentionTime;
+ this.maxIdleStateRetentionTime = maxIdleStateRetentionTime;
+ }
+
+ public StreamQueryConfig() {
+ }
+
/**
* Specifies a minimum and a maximum time interval for how long idle state, i.e., state which
* was not updated, will be retained.
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
index 325732f..8bdc461 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.api;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.table.api.internal.CompositePlannerConfig;
import org.apache.flink.util.Preconditions;
@@ -59,6 +60,18 @@ public class TableConfig {
private Integer maxGeneratedCodeLength = 64000; // just an estimate
/**
+ * The minimum time until state which was not updated will be retained.
+ * State might be cleared and removed if it was not updated for the defined period of time.
+ */
+ private long minIdleStateRetentionTime = 0L;
+
+ /**
+ * The maximum time until state which was not updated will be retained.
+ * State will be cleared and removed if it was not updated for the defined period of time.
+ */
+ private long maxIdleStateRetentionTime = 0L;
+
+ /**
* Returns the timezone for date/time/timestamp conversions.
*/
public TimeZone getTimeZone() {
@@ -135,6 +148,51 @@ public class TableConfig {
this.maxGeneratedCodeLength = Preconditions.checkNotNull(maxGeneratedCodeLength);
}
+ /**
+ * Specifies a minimum and a maximum time interval for how long idle state, i.e., state which
+ * was not updated, will be retained.
+ * State will never be cleared until it was idle for less than the minimum time and will never
+ * be kept if it was idle for more than the maximum time.
+ *
+ * <p>When new data arrives for previously cleaned-up state, the new data will be handled as if it
+ * was the first data. This can result in previous results being overwritten.
+ *
+ * <p>Set to 0 (zero) to never clean-up the state.
+ *
+ * <p>NOTE: Cleaning up state requires additional bookkeeping which becomes less expensive for
+ * larger differences of minTime and maxTime. The difference between minTime and maxTime must be
+ * at least 5 minutes.
+ *
+ * @param minTime The minimum time interval for which idle state is retained. Set to 0 (zero) to
+ * never clean-up the state.
+ * @param maxTime The maximum time interval for which idle state is retained. Must be at least
+ * 5 minutes greater than minTime. Set to 0 (zero) to never clean-up the state.
+ */
+ public void setIdleStateRetentionTime(Time minTime, Time maxTime) {
+ if (maxTime.toMilliseconds() - minTime.toMilliseconds() < 300000 &&
+ !(maxTime.toMilliseconds() == 0 && minTime.toMilliseconds() == 0)) {
+ throw new IllegalArgumentException(
+ "Difference between minTime: " + minTime.toString() + " and maxTime: " + maxTime.toString() +
+ "shoud be at least 5 minutes.");
+ }
+ minIdleStateRetentionTime = minTime.toMilliseconds();
+ maxIdleStateRetentionTime = maxTime.toMilliseconds();
+ }
+
+ /**
+ * @return The minimum time until state which was not updated will be retained.
+ */
+ public long getMinIdleStateRetentionTime() {
+ return minIdleStateRetentionTime;
+ }
+
+ /**
+ * @return The maximum time until state which was not updated will be retained.
+ */
+ public long getMaxIdleStateRetentionTime() {
+ return maxIdleStateRetentionTime;
+ }
+
public static TableConfig getDefault() {
return new TableConfig();
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
index 85d20f6..686e816 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.api;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.ExternalCatalog;
@@ -188,21 +189,6 @@ public interface TableEnvironment {
* {@link TableEnvironment#useCatalog(String)} for the rules on the path resolution.
*
* @param table The Table to write to the sink.
- * @param queryConfig The {@link QueryConfig} to use.
- * @param sinkPath The first part of the path of the registered {@link TableSink} to which the {@link Table} is
- * written. This is to ensure at least the name of the {@link TableSink} is provided.
- * @param sinkPathContinued The remaining part of the path of the registered {@link TableSink} to which the
- * {@link Table} is written.
- */
- void insertInto(Table table, QueryConfig queryConfig, String sinkPath, String... sinkPathContinued);
-
- /**
- * Writes the {@link Table} to a {@link TableSink} that was registered under the specified name.
- *
- * <p>See the documentation of {@link TableEnvironment#useDatabase(String)} or
- * {@link TableEnvironment#useCatalog(String)} for the rules on the path resolution.
- *
- * @param table The Table to write to the sink.
* @param sinkPath The first part of the path of the registered {@link TableSink} to which the {@link Table} is
* written. This is to ensure at least the name of the {@link TableSink} is provided.
* @param sinkPathContinued The remaining part of the path of the registered {@link TableSink} to which the
@@ -336,31 +322,6 @@ public interface TableEnvironment {
void sqlUpdate(String stmt);
/**
- * Evaluates a SQL statement such as INSERT, UPDATE or DELETE; or a DDL statement;
- * NOTE: Currently only SQL INSERT statements are supported.
- *
- * <p>All tables referenced by the query must be registered in the TableEnvironment.
- * A {@link Table} is automatically registered when its {@link Table#toString()} method is
- * called, for example when it is embedded into a String.
- * Hence, SQL queries can directly reference a {@link Table} as follows:
- *
- * <pre>
- * {@code
- * // register the configured table sink into which the result is inserted.
- * tEnv.registerTableSink("sinkTable", configuredSink);
- * Table sourceTable = ...
- * String tableName = sourceTable.toString();
- * // sourceTable is not registered to the table environment
- * tEnv.sqlUpdate(s"INSERT INTO sinkTable SELECT * FROM tableName", config);
- * }
- * </pre>
- *
- * @param stmt The SQL statement to evaluate.
- * @param config The {@link QueryConfig} to use.
- */
- void sqlUpdate(String stmt, QueryConfig config);
-
- /**
* Gets the current default catalog name of the current session.
*
* @return The current default catalog name that is used for the path resolution.
@@ -496,4 +457,24 @@ public interface TableEnvironment {
* Returns the table config that defines the runtime behavior of the Table API.
*/
TableConfig getConfig();
+
+ /**
+ * Triggers the program execution. The environment will execute all parts of
+ * the program.
+ *
+ * <p>The program execution will be logged and displayed with the provided name
+ *
+ * <p><b>NOTE:</b>It is highly advised to set all parameters in the {@link TableConfig}
+ * on the very beginning of the program. It is undefined what configurations values will
+ * be used for the execution if queries are mixed with config changes. It depends on
+ * the characteristic of the particular parameter. For some of them the value from the
+ * point in time of query construction (e.g. the currentCatalog) will be used. On the
+ * other hand some values might be evaluated according to the state from the time when
+ * this method is called (e.g. timeZone).
+ *
+ * @param jobName Desired name of the job
+ * @return The result of the job execution, containing elapsed time and accumulators.
+ * @throws Exception which occurs during job execution.
+ */
+ JobExecutionResult execute(String jobName) throws Exception;
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/QueryConfigProvider.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/QueryConfigProvider.java
deleted file mode 100644
index 171354f..0000000
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/QueryConfigProvider.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.internal;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.api.PlannerConfig;
-import org.apache.flink.table.api.StreamQueryConfig;
-import org.apache.flink.table.delegation.Planner;
-
-/**
- * An adapter to {@link PlannerConfig} that enables to pass {@link org.apache.flink.table.api.QueryConfig}
- * to {@link Planner} via {@link org.apache.flink.table.api.TableConfig}.
- */
-@Internal
-public class QueryConfigProvider implements PlannerConfig {
- private StreamQueryConfig config;
-
- public StreamQueryConfig getConfig() {
- return config;
- }
-
- public void setConfig(StreamQueryConfig config) {
- this.config = config;
- }
-}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index e94c65a..b2c786a 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -20,11 +20,10 @@ package org.apache.flink.table.api.internal;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.table.api.CatalogNotExistException;
-import org.apache.flink.table.api.QueryConfig;
-import org.apache.flink.table.api.StreamQueryConfig;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableEnvironment;
@@ -76,12 +75,12 @@ public class TableEnvironmentImpl implements TableEnvironment {
private final String defaultCatalogName;
private final String defaultDatabaseName;
- private final TableConfig tableConfig;
private final OperationTreeBuilder operationTreeBuilder;
+ private final List<ModifyOperation> bufferedModifyOperations = new ArrayList<>();
+ protected final TableConfig tableConfig;
protected final Executor execEnv;
protected final FunctionCatalog functionCatalog;
- protected final QueryConfigProvider queryConfigProvider = new QueryConfigProvider();
protected final Planner planner;
protected TableEnvironmentImpl(
@@ -95,7 +94,6 @@ public class TableEnvironmentImpl implements TableEnvironment {
this.execEnv = executor;
this.tableConfig = tableConfig;
- this.tableConfig.addPlannerConfig(queryConfigProvider);
this.defaultCatalogName = catalogManager.getCurrentCatalog();
this.defaultDatabaseName = catalogManager.getCurrentDatabase();
@@ -266,31 +264,24 @@ public class TableEnvironmentImpl implements TableEnvironment {
}
@Override
- public void sqlUpdate(String stmt) {
- sqlUpdate(stmt, new StreamQueryConfig());
- }
-
- @Override
- public void insertInto(Table table, QueryConfig queryConfig, String path, String... pathContinued) {
+ public void insertInto(Table table, String path, String... pathContinued) {
List<String> fullPath = new ArrayList<>(Arrays.asList(pathContinued));
fullPath.add(0, path);
- queryConfigProvider.setConfig((StreamQueryConfig) queryConfig);
- List<Transformation<?>> translate = planner.translate(Collections.singletonList(
+ List<ModifyOperation> modifyOperations = Collections.singletonList(
new CatalogSinkModifyOperation(
fullPath,
- table.getQueryOperation())));
-
- execEnv.apply(translate);
- }
+ table.getQueryOperation()));
- @Override
- public void insertInto(Table table, String path, String... pathContinued) {
- insertInto(table, new StreamQueryConfig(), path, pathContinued);
+ if (isEagerOperationTranslation()) {
+ translate(modifyOperations);
+ } else {
+ buffer(modifyOperations);
+ }
}
@Override
- public void sqlUpdate(String stmt, QueryConfig config) {
+ public void sqlUpdate(String stmt) {
List<Operation> operations = planner.parse(stmt);
if (operations.size() != 1) {
@@ -301,11 +292,12 @@ public class TableEnvironmentImpl implements TableEnvironment {
Operation operation = operations.get(0);
if (operation instanceof ModifyOperation) {
- queryConfigProvider.setConfig((StreamQueryConfig) config);
- List<Transformation<?>> transformations =
- planner.translate(Collections.singletonList((ModifyOperation) operation));
-
- execEnv.apply(transformations);
+ List<ModifyOperation> modifyOperations = Collections.singletonList((ModifyOperation) operation);
+ if (isEagerOperationTranslation()) {
+ translate(modifyOperations);
+ } else {
+ buffer(modifyOperations);
+ }
} else {
throw new TableException(
"Unsupported SQL query! sqlUpdate() only accepts a single SQL statements of type INSERT.");
@@ -337,6 +329,48 @@ public class TableEnvironmentImpl implements TableEnvironment {
return tableConfig;
}
+ @Override
+ public JobExecutionResult execute(String jobName) throws Exception {
+ translate(bufferedModifyOperations);
+ bufferedModifyOperations.clear();
+ return execEnv.execute(jobName);
+ }
+
+ /**
+ * Defines the behavior of this {@link TableEnvironment}. If true the queries will
+ * be translated immediately. If false the {@link ModifyOperation}s will be buffered
+ * and translated only when {@link #execute(String)} is called.
+ *
+ * <p>If the {@link TableEnvironment} works in a lazy manner it is undefined what
+ * configurations values will be used. It depends on the characteristic of the particular
+ * parameter. Some might used values current to the time of query construction (e.g. the currentCatalog)
+ * and some use values from the time when {@link #execute(String)} is called (e.g. timeZone).
+ *
+ * @return true if the queries should be translated immediately.
+ */
+ protected boolean isEagerOperationTranslation() {
+ return false;
+ }
+
+ /**
+ * Subclasses can override this method to add additional checks.
+ *
+ * @param tableSource tableSource to validate
+ */
+ protected void validateTableSource(TableSource<?> tableSource) {
+ TableSourceValidation.validateTableSource(tableSource);
+ }
+
+ private void translate(List<ModifyOperation> modifyOperations) {
+ List<Transformation<?>> transformations = planner.translate(modifyOperations);
+
+ execEnv.apply(transformations);
+ }
+
+ private void buffer(List<ModifyOperation> modifyOperations) {
+ bufferedModifyOperations.addAll(modifyOperations);
+ }
+
protected void registerTableInternal(String name, CatalogBaseTable table) {
try {
checkValidTableName(name);
@@ -374,15 +408,6 @@ public class TableEnvironmentImpl implements TableEnvironment {
}
}
- /**
- * Subclasses can override this method to add additional checks.
- *
- * @param tableSource tableSource to validate
- */
- protected void validateTableSource(TableSource<?> tableSource) {
- TableSourceValidation.validateTableSource(tableSource);
- }
-
private void registerTableSourceInternal(String name, TableSource<?> tableSource) {
validateTableSource(tableSource);
Optional<CatalogBaseTable> table = getCatalogTable(defaultCatalogName, defaultDatabaseName, name);
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
index ae08d24..41e483b 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.api.internal;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.table.api.AggregatedTable;
import org.apache.flink.table.api.FlatAggregateTable;
import org.apache.flink.table.api.GroupWindow;
@@ -27,6 +28,7 @@ import org.apache.flink.table.api.GroupedTable;
import org.apache.flink.table.api.OverWindow;
import org.apache.flink.table.api.OverWindowedTable;
import org.apache.flink.table.api.QueryConfig;
+import org.apache.flink.table.api.StreamQueryConfig;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableException;
@@ -416,7 +418,14 @@ public class TableImpl implements Table {
@Override
public void insertInto(QueryConfig conf, String tablePath, String... tablePathContinued) {
- tableEnvironment.insertInto(this, conf, tablePath, tablePathContinued);
+ if (conf instanceof StreamQueryConfig) {
+ StreamQueryConfig streamQueryConfig = (StreamQueryConfig) conf;
+ tableEnvironment.getConfig().setIdleStateRetentionTime(
+ Time.milliseconds(streamQueryConfig.getMinIdleStateRetentionTime()),
+ Time.milliseconds(streamQueryConfig.getMaxIdleStateRetentionTime())
+ );
+ }
+ tableEnvironment.insertInto(this, tablePath, tablePathContinued);
}
@Override
diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala
index fa6b178..6b3ecbc 100644
--- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala
+++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala
@@ -17,6 +17,7 @@
*/
package org.apache.flink.table.api.scala
+import org.apache.flink.api.common.JobExecutionResult
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.flink.table.api.{TableEnvironment, _}
@@ -24,6 +25,7 @@ import org.apache.flink.table.catalog.{CatalogManager, GenericInMemoryCatalog}
import org.apache.flink.table.descriptors.{BatchTableDescriptor, ConnectorDescriptor}
import org.apache.flink.table.expressions.Expression
import org.apache.flink.table.functions.{AggregateFunction, TableFunction}
+import org.apache.flink.table.sinks.TableSink
/**
* The [[TableEnvironment]] for a Scala batch [[ExecutionEnvironment]] that works
@@ -155,6 +157,66 @@ trait BatchTableEnvironment extends TableEnvironment {
queryConfig: BatchQueryConfig): DataSet[T]
/**
+ * Evaluates a SQL statement such as INSERT, UPDATE or DELETE; or a DDL statement;
+ * NOTE: Currently only SQL INSERT statements are supported.
+ *
+ * All tables referenced by the query must be registered in the TableEnvironment.
+ * A [[Table]] is automatically registered when its [[Table#toString()]] method is
+ * called, for example when it is embedded into a String.
+ * Hence, SQL queries can directly reference a [[Table]] as follows:
+ *
+ * {{{
+ * // register the configured table sink into which the result is inserted.
+ * tEnv.registerTableSink("sinkTable", configuredSink);
+ * Table sourceTable = ...
+ * String tableName = sourceTable.toString();
+ * // sourceTable is not registered to the table environment
+ * tEnv.sqlUpdate(s"INSERT INTO sinkTable SELECT * FROM tableName", config);
+ * }}}
+ *
+ * @param stmt The SQL statement to evaluate.
+ * @param config The [[BatchQueryConfig]] to use.
+ */
+ def sqlUpdate(stmt: String, config: BatchQueryConfig): Unit
+
+ /**
+ * Writes the [[Table]] to a [[TableSink]] that was registered under the specified name.
+ *
+ * See the documentation of TableEnvironment#useDatabase or
+ * TableEnvironment.useCatalog(String) for the rules on the path resolution.
+ *
+ * @param table The Table to write to the sink.
+ * @param queryConfig The [[BatchQueryConfig]] to use.
+ * @param sinkPath The first part of the path of the registered [[TableSink]] to
+ * which the [[Table]] is written. This is to ensure at least the
+ * name of the [[TableSink]] is provided.
+ * @param sinkPathContinued The remaining part of the path of the registered [[TableSink]] to
+ * which the [[Table]] is written.
+ */
+ def insertInto(
+ table: Table,
+ queryConfig: BatchQueryConfig,
+ sinkPath: String,
+ sinkPathContinued: String*): Unit
+
+ /**
+ * Triggers the program execution. The environment will execute all parts of
+ * the program.
+ *
+ * The program execution will be logged and displayed with the provided name
+ *
+ * It calls the ExecutionEnvironment#execute on the underlying
+ * [[ExecutionEnvironment]]. In contrast to the [[TableEnvironment]] this
+ * environment translates queries eagerly.
+ *
+ * @param jobName Desired name of the job
+ * @return The result of the job execution, containing elapsed time and accumulators.
+ * @throws Exception which occurs during job execution.
+ */
+ @throws[Exception]
+ override def execute(jobName: String): JobExecutionResult
+
+ /**
* Creates a table source and/or table sink from a descriptor.
*
* Descriptors allow for declaring the communication to external systems in an
diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
index 3baa5fa..db88251 100644
--- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
+++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
@@ -18,6 +18,7 @@
package org.apache.flink.table.api.scala
import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.JobExecutionResult
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl
@@ -25,6 +26,7 @@ import org.apache.flink.table.api.{TableEnvironment, _}
import org.apache.flink.table.descriptors.{ConnectorDescriptor, StreamTableDescriptor}
import org.apache.flink.table.expressions.Expression
import org.apache.flink.table.functions.{AggregateFunction, TableAggregateFunction, TableFunction}
+import org.apache.flink.table.sinks.TableSink
/**
* This table environment is the entry point and central context for creating Table & SQL
@@ -209,6 +211,67 @@ trait StreamTableEnvironment extends TableEnvironment {
table: Table,
queryConfig: StreamQueryConfig): DataStream[(Boolean, T)]
+
+ /**
+ * Evaluates a SQL statement such as INSERT, UPDATE or DELETE; or a DDL statement;
+ * NOTE: Currently only SQL INSERT statements are supported.
+ *
+ * All tables referenced by the query must be registered in the TableEnvironment.
+ * A [[Table]] is automatically registered when its [[Table#toString()]] method is
+ * called, for example when it is embedded into a String.
+ * Hence, SQL queries can directly reference a [[Table]] as follows:
+ *
+ * {{{
+ * // register the configured table sink into which the result is inserted.
+ * tEnv.registerTableSink("sinkTable", configuredSink);
+ * Table sourceTable = ...
+ * String tableName = sourceTable.toString();
+ * // sourceTable is not registered to the table environment
+ * tEnv.sqlUpdate(s"INSERT INTO sinkTable SELECT * FROM tableName", config);
+ * }}}
+ *
+ * @param stmt The SQL statement to evaluate.
+ * @param config The [[QueryConfig]] to use.
+ */
+ def sqlUpdate(stmt: String, config: StreamQueryConfig): Unit
+
+ /**
+ * Writes the [[Table]] to a [[TableSink]] that was registered under the specified name.
+ *
+ * See the documentation of TableEnvironment#useDatabase or
+ * TableEnvironment.useCatalog(String) for the rules on the path resolution.
+ *
+ * @param table The Table to write to the sink.
+ * @param queryConfig The [[StreamQueryConfig]] to use.
+ * @param sinkPath The first part of the path of the registered [[TableSink]] to
+ * which the [[Table]] is written. This is to ensure at least the name
+ * of the [[TableSink]] is provided.
+ * @param sinkPathContinued The remaining part of the path of the registered [[TableSink]] to
+ * which the [[Table]] is written.
+ */
+ def insertInto(
+ table: Table,
+ queryConfig: StreamQueryConfig,
+ sinkPath: String,
+ sinkPathContinued: String*): Unit
+
+ /**
+ * Triggers the program execution. The environment will execute all parts of
+ * the program.
+ *
+ * The program execution will be logged and displayed with the provided name
+ *
+ * It calls the StreamExecutionEnvironment#execute on the underlying
+ * [[StreamExecutionEnvironment]]. In contrast to the [[TableEnvironment]] this
+ * environment translates queries eagerly.
+ *
+ * @param jobName Desired name of the job
+ * @return The result of the job execution, containing elapsed time and accumulators.
+ * @throws Exception which occurs during job execution.
+ */
+ @throws[Exception]
+ override def execute(jobName: String): JobExecutionResult
+
/**
* Creates a table source and/or table sink from a descriptor.
*
diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
index 990f043..fedb317 100644
--- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
+++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
@@ -18,6 +18,7 @@
package org.apache.flink.table.api.scala.internal
import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.time.Time
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.dag.Transformation
import org.apache.flink.api.scala._
@@ -102,7 +103,9 @@ class StreamTableEnvironmentImpl (
table.getQueryOperation,
TypeConversions.fromLegacyInfoToDataType(returnType),
OutputConversionModifyOperation.UpdateMode.APPEND)
- queryConfigProvider.setConfig(queryConfig)
+ tableConfig.setIdleStateRetentionTime(
+ Time.milliseconds(queryConfig.getMinIdleStateRetentionTime),
+ Time.milliseconds(queryConfig.getMaxIdleStateRetentionTime))
toDataStream(table, modifyOperation)
}
@@ -121,7 +124,9 @@ class StreamTableEnvironmentImpl (
TypeConversions.fromLegacyInfoToDataType(returnType),
OutputConversionModifyOperation.UpdateMode.RETRACT)
- queryConfigProvider.setConfig(queryConfig)
+ tableConfig.setIdleStateRetentionTime(
+ Time.milliseconds(queryConfig.getMinIdleStateRetentionTime),
+ Time.milliseconds(queryConfig.getMaxIdleStateRetentionTime))
toDataStream(table, modifyOperation)
}
@@ -182,6 +187,8 @@ class StreamTableEnvironmentImpl (
}
}
+ override protected def isEagerOperationTranslation(): Boolean = true
+
private def toDataStream[T](
table: Table,
modifyOperation: OutputConversionModifyOperation)
@@ -233,6 +240,26 @@ class StreamTableEnvironmentImpl (
typeInfoSchema.getIndices,
typeInfoSchema.toTableSchema)
}
+
+ override def sqlUpdate(stmt: String, config: StreamQueryConfig): Unit = {
+ tableConfig
+ .setIdleStateRetentionTime(
+ Time.milliseconds(config.getMinIdleStateRetentionTime),
+ Time.milliseconds(config.getMaxIdleStateRetentionTime))
+ sqlUpdate(stmt)
+ }
+
+ override def insertInto(
+ table: Table,
+ queryConfig: StreamQueryConfig,
+ sinkPath: String,
+ sinkPathContinued: String*): Unit = {
+ tableConfig
+ .setIdleStateRetentionTime(
+ Time.milliseconds(queryConfig.getMinIdleStateRetentionTime),
+ Time.milliseconds(queryConfig.getMaxIdleStateRetentionTime))
+ insertInto(table, sinkPath, sinkPathContinued: _*)
+ }
}
object StreamTableEnvironmentImpl {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
index b8425d7..eb0c543 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.api.internal
import org.apache.calcite.plan.RelOptUtil
import org.apache.calcite.rel.RelNode
+import org.apache.flink.api.common.JobExecutionResult
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.io.DiscardingOutputFormat
@@ -65,8 +66,6 @@ abstract class BatchTableEnvImpl(
planningConfigurationBuilder
)
- override def queryConfig: BatchQueryConfig = new BatchQueryConfig
-
/**
* Registers an internal [[BatchTableSource]] in this [[TableEnvImpl]]'s catalog without
* name checking. Registered tables can be referenced in SQL queries.
@@ -103,34 +102,25 @@ abstract class BatchTableEnvImpl(
*
* @param table The [[Table]] to write.
* @param sink The [[TableSink]] to write the [[Table]] to.
- * @param queryConfig The configuration for the query to generate.
* @tparam T The expected type of the [[DataSet]] which represents the [[Table]].
*/
override private[flink] def writeToSink[T](
table: Table,
- sink: TableSink[T],
- queryConfig: QueryConfig): Unit = {
-
- // Check the query configuration to be a batch one.
- val batchQueryConfig = queryConfig match {
- case batchConfig: BatchQueryConfig => batchConfig
- case _ =>
- throw new TableException("BatchQueryConfig required to configure batch query.")
- }
+ sink: TableSink[T]): Unit = {
sink match {
case batchSink: BatchTableSink[T] =>
val outputType = fromDataTypeToLegacyInfo(sink.getConsumedDataType)
.asInstanceOf[TypeInformation[T]]
// translate the Table into a DataSet and provide the type that the TableSink expects.
- val result: DataSet[T] = translate(table, batchQueryConfig)(outputType)
+ val result: DataSet[T] = translate(table)(outputType)
// Give the DataSet to the TableSink to emit it.
batchSink.emitDataSet(result)
case boundedSink: OutputFormatTableSink[T] =>
val outputType = fromDataTypeToLegacyInfo(sink.getConsumedDataType)
.asInstanceOf[TypeInformation[T]]
// translate the Table into a DataSet and provide the type that the TableSink expects.
- val result: DataSet[T] = translate(table, batchQueryConfig)(outputType)
+ val result: DataSet[T] = translate(table)(outputType)
// use the OutputFormat to consume the DataSet.
val dataSink = result.output(boundedSink.getOutputFormat)
dataSink.name(
@@ -188,8 +178,7 @@ abstract class BatchTableEnvImpl(
val optimizedPlan = optimizer.optimize(ast)
val dataSet = translate[Row](
optimizedPlan,
- getTableSchema(table.getQueryOperation.getTableSchema.getFieldNames, optimizedPlan),
- queryConfig)(
+ getTableSchema(table.getQueryOperation.getTableSchema.getFieldNames, optimizedPlan))(
new GenericTypeInfo(classOf[Row]))
dataSet.output(new DiscardingOutputFormat[Row])
val env = dataSet.getExecutionEnvironment
@@ -211,6 +200,8 @@ abstract class BatchTableEnvImpl(
def explain(table: Table): String = explain(table: Table, extended = false)
+ override def execute(jobName: String): JobExecutionResult = execEnv.execute(jobName)
+
protected def asQueryOperation[T](dataSet: DataSet[T], fields: Option[Array[Expression]])
: DataSetQueryOperation[T] = {
val inputType = dataSet.getType
@@ -253,21 +244,17 @@ abstract class BatchTableEnvImpl(
* Table API calls and / or SQL queries and generating corresponding [[DataSet]] operators.
*
* @param table The root node of the relational expression tree.
- * @param queryConfig The configuration for the query to generate.
* @param tpe The [[TypeInformation]] of the resulting [[DataSet]].
* @tparam A The type of the resulting [[DataSet]].
* @return The [[DataSet]] that corresponds to the translated [[Table]].
*/
- protected def translate[A](
- table: Table,
- queryConfig: BatchQueryConfig)(implicit tpe: TypeInformation[A]): DataSet[A] = {
+ protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataSet[A] = {
val queryOperation = table.getQueryOperation
val relNode = getRelBuilder.tableOperation(queryOperation).build()
val dataSetPlan = optimizer.optimize(relNode)
translate(
dataSetPlan,
- getTableSchema(queryOperation.getTableSchema.getFieldNames, dataSetPlan),
- queryConfig)
+ getTableSchema(queryOperation.getTableSchema.getFieldNames, dataSetPlan))
}
/**
@@ -276,20 +263,18 @@ abstract class BatchTableEnvImpl(
* @param logicalPlan The root node of the relational expression tree.
* @param logicalType The row type of the result. Since the logicalPlan can lose the
* field naming during optimization we pass the row type separately.
- * @param queryConfig The configuration for the query to generate.
* @param tpe The [[TypeInformation]] of the resulting [[DataSet]].
* @tparam A The type of the resulting [[DataSet]].
* @return The [[DataSet]] that corresponds to the translated [[Table]].
*/
protected def translate[A](
logicalPlan: RelNode,
- logicalType: TableSchema,
- queryConfig: BatchQueryConfig)(implicit tpe: TypeInformation[A]): DataSet[A] = {
+ logicalType: TableSchema)(implicit tpe: TypeInformation[A]): DataSet[A] = {
validateInputTypeInfo(tpe)
logicalPlan match {
case node: DataSetRel =>
- val plan = node.translateToPlan(this, queryConfig)
+ val plan = node.translateToPlan(this, new BatchQueryConfig)
val conversion =
getConversionMapper(
plan.getType,
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index 2d37173..24e52d7 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -94,11 +94,6 @@ abstract class TableEnvImpl(
case _ => false
}
- private[flink] def queryConfig: QueryConfig = this match {
- case _: BatchTableEnvImpl => new BatchQueryConfig
- case _ => null
- }
-
override def registerExternalCatalog(name: String, externalCatalog: ExternalCatalog): Unit = {
catalogManager.registerExternalCatalog(name, externalCatalog)
}
@@ -403,10 +398,6 @@ abstract class TableEnvImpl(
}
override def sqlUpdate(stmt: String): Unit = {
- sqlUpdate(stmt, this.queryConfig)
- }
-
- override def sqlUpdate(stmt: String, config: QueryConfig): Unit = {
val planner = getFlinkPlanner
// parse the sql query
val parsed = planner.parse(stmt)
@@ -424,7 +415,7 @@ abstract class TableEnvImpl(
val targetTablePath = insert.getTargetTable.asInstanceOf[SqlIdentifier].names
// insert query result into sink table
- insertInto(queryResult, config, targetTablePath.asScala:_*)
+ insertInto(queryResult, targetTablePath.asScala:_*)
case _ =>
throw new TableException(
"Unsupported SQL query! sqlUpdate() only accepts SQL statements of type INSERT.")
@@ -444,25 +435,15 @@ abstract class TableEnvImpl(
*
* @param table The [[Table]] to write.
* @param sink The [[TableSink]] to write the [[Table]] to.
- * @param conf The [[QueryConfig]] to use.
* @tparam T The data type that the [[TableSink]] expects.
*/
- private[flink] def writeToSink[T](table: Table, sink: TableSink[T], conf: QueryConfig): Unit
-
-
- override def insertInto(
- table: Table,
- queryConfig: QueryConfig,
- path: String,
- pathContinued: String*): Unit = {
- insertInto(table, queryConfig, path +: pathContinued: _*)
- }
+ private[flink] def writeToSink[T](table: Table, sink: TableSink[T]): Unit
override def insertInto(
table: Table,
path: String,
pathContinued: String*): Unit = {
- insertInto(table, queryConfig, path, pathContinued: _*)
+ insertInto(table, path +: pathContinued: _*)
}
/**
@@ -470,9 +451,8 @@ abstract class TableEnvImpl(
*
* @param table The table to write to the TableSink.
* @param sinkTablePath The name of the registered TableSink.
- * @param conf The query configuration to use.
*/
- private def insertInto(table: Table, conf: QueryConfig, sinkTablePath: String*): Unit = {
+ private def insertInto(table: Table, sinkTablePath: String*): Unit = {
// check that sink table exists
if (null == sinkTablePath) {
@@ -491,7 +471,7 @@ abstract class TableEnvImpl(
// validate schema of source table and table sink
TableSinkUtils.validateSink(table.getQueryOperation, sinkTablePath.asJava, tableSink)
// emit the table to the configured table sink
- writeToSink(table, tableSink, conf)
+ writeToSink(table, tableSink)
}
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/java/internal/BatchTableEnvironmentImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/java/internal/BatchTableEnvironmentImpl.scala
index 5ef9917..7358721 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/java/internal/BatchTableEnvironmentImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/java/internal/BatchTableEnvironmentImpl.scala
@@ -68,26 +68,26 @@ class BatchTableEnvironmentImpl(
override def toDataSet[T](table: Table, clazz: Class[T]): DataSet[T] = {
// Use the default query config.
- translate[T](table, queryConfig)(TypeExtractor.createTypeInfo(clazz))
+ translate[T](table)(TypeExtractor.createTypeInfo(clazz))
}
override def toDataSet[T](table: Table, typeInfo: TypeInformation[T]): DataSet[T] = {
// Use the default batch query config.
- translate[T](table, queryConfig)(typeInfo)
+ translate[T](table)(typeInfo)
}
override def toDataSet[T](
table: Table,
clazz: Class[T],
queryConfig: BatchQueryConfig): DataSet[T] = {
- translate[T](table, queryConfig)(TypeExtractor.createTypeInfo(clazz))
+ translate[T](table)(TypeExtractor.createTypeInfo(clazz))
}
override def toDataSet[T](
table: Table,
typeInfo: TypeInformation[T],
queryConfig: BatchQueryConfig): DataSet[T] = {
- translate[T](table, queryConfig)(typeInfo)
+ translate[T](table)(typeInfo)
}
override def registerFunction[T](name: String, tf: TableFunction[T]): Unit = {
@@ -112,4 +112,14 @@ class BatchTableEnvironmentImpl(
registerAggregateFunctionInternal[T, ACC](name, f)
}
+
+ override def sqlUpdate(
+ stmt: String,
+ config: BatchQueryConfig): Unit = sqlUpdate(stmt)
+
+ override def insertInto(
+ table: Table,
+ queryConfig: BatchQueryConfig,
+ sinkPath: String,
+ sinkPathContinued: String*): Unit = insertInto(table, sinkPath, sinkPathContinued: _*)
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/internal/BatchTableEnvironmentImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/internal/BatchTableEnvironmentImpl.scala
index 6c1d155..8e13426 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/internal/BatchTableEnvironmentImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/internal/BatchTableEnvironmentImpl.scala
@@ -63,12 +63,12 @@ class BatchTableEnvironmentImpl(
override def toDataSet[T: TypeInformation](table: Table): DataSet[T] = {
// Use the default batch query config.
- wrap[T](translate(table, queryConfig))(ClassTag.AnyRef.asInstanceOf[ClassTag[T]])
+ wrap[T](translate(table))(ClassTag.AnyRef.asInstanceOf[ClassTag[T]])
}
override def toDataSet[T: TypeInformation](
table: Table, queryConfig: BatchQueryConfig): DataSet[T] = {
- wrap[T](translate(table, queryConfig))(ClassTag.AnyRef.asInstanceOf[ClassTag[T]])
+ wrap[T](translate(table))(ClassTag.AnyRef.asInstanceOf[ClassTag[T]])
}
override def registerFunction[T: TypeInformation](name: String, tf: TableFunction[T]): Unit = {
@@ -81,5 +81,13 @@ class BatchTableEnvironmentImpl(
: Unit = {
registerAggregateFunctionInternal[T, ACC](name, f)
}
+
+ override def sqlUpdate(stmt: String, config: BatchQueryConfig): Unit = sqlUpdate(stmt)
+
+ override def insertInto(
+ table: Table,
+ queryConfig: BatchQueryConfig,
+ sinkPath: String,
+ sinkPathContinued: String*): Unit = insertInto(table, sinkPath, sinkPathContinued: _*)
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
index 92465d8..8cd11fc 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
@@ -17,23 +17,23 @@
*/
package org.apache.flink.table.planner
-import _root_.java.lang.{Boolean => JBool}
-import _root_.java.util.{Objects, List => JList}
-import java.util
-
-import org.apache.calcite.jdbc.CalciteSchema
-import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema
-import org.apache.calcite.plan.RelOptUtil
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.sql.{SqlIdentifier, SqlInsert, SqlKind, SqlNode}
import org.apache.flink.annotation.VisibleForTesting
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.dag.Transformation
import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink}
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+
+import org.apache.calcite.jdbc.CalciteSchema
+import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema
+import org.apache.calcite.plan.RelOptUtil
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.sql.{SqlIdentifier, SqlInsert, SqlKind, SqlNode}
+
+import _root_.java.lang.{Boolean => JBool}
+import _root_.java.util.{Objects, List => JList}
+import java.util
import org.apache.flink.table.api._
-import org.apache.flink.table.api.internal.QueryConfigProvider
import org.apache.flink.table.calcite.{CalciteConfig, FlinkPlannerImpl, FlinkRelBuilder, FlinkTypeFactory}
import org.apache.flink.table.catalog.{CatalogManager, CatalogManagerCalciteSchema, CatalogTable, ConnectorCatalogTable, _}
import org.apache.flink.table.delegation.{Executor, Planner}
@@ -187,7 +187,10 @@ class StreamPlanner(
}
private def unwrapQueryConfig = {
- config.getPlannerConfig.unwrap(classOf[QueryConfigProvider]).get().getConfig
+ new StreamQueryConfig(
+ config.getMinIdleStateRetentionTime,
+ config.getMaxIdleStateRetentionTime
+ )
}
private def explain(tableOperation: QueryOperation, queryConfig: StreamQueryConfig) = {
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/validation/InsertIntoValidationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/validation/InsertIntoValidationTest.scala
index 55c3c31..faccc9f 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/validation/InsertIntoValidationTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/validation/InsertIntoValidationTest.scala
@@ -72,6 +72,6 @@ class InsertIntoValidationTest extends TableTestBase {
val sql = "INSERT INTO targetTable (d, f) SELECT a, c FROM sourceTable"
// must fail because partial insert is not supported yet.
- util.tableEnv.sqlUpdate(sql, util.tableEnv.queryConfig)
+ util.tableEnv.sqlUpdate(sql)
}
}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/GroupAggregateHarnessTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/GroupAggregateHarnessTest.scala
index 86458b3..6d382f2 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/GroupAggregateHarnessTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/GroupAggregateHarnessTest.scala
@@ -19,19 +19,20 @@ package org.apache.flink.table.runtime.harness
import java.lang.{Integer => JInt, Long => JLong}
import java.util.concurrent.ConcurrentLinkedQueue
-
import org.apache.flink.api.common.time.Time
import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.operators.KeyedProcessOperator
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.TableConfig
import org.apache.flink.table.api.scala._
import org.apache.flink.table.runtime.aggregate._
import org.apache.flink.table.runtime.harness.HarnessTestBase._
import org.apache.flink.table.runtime.types.CRow
import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.{MultiArgCount, MultiArgSum}
import org.apache.flink.types.Row
+
import org.junit.Assert.assertTrue
import org.junit.Test
@@ -39,8 +40,13 @@ import scala.collection.mutable
class GroupAggregateHarnessTest extends HarnessTestBase {
- protected var queryConfig =
+ private val queryConfig =
new TestStreamQueryConfig(Time.seconds(2), Time.seconds(3))
+ private val tableConfig = new TableConfig {
+ override def getMinIdleStateRetentionTime: Long = Time.seconds(2).toMilliseconds
+
+ override def getMaxIdleStateRetentionTime: Long = Time.seconds(2).toMilliseconds
+ }
@Test
def testAggregate(): Unit = {
@@ -185,7 +191,7 @@ class GroupAggregateHarnessTest extends HarnessTestBase {
@Test
def testDistinctAggregateWithRetract(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = StreamTableEnvironment.create(env)
+ val tEnv = StreamTableEnvironment.create(env, tableConfig)
val data = new mutable.MutableList[(JLong, JInt)]
val t = env.fromCollection(data).toTable(tEnv, 'a, 'b)
@@ -202,7 +208,7 @@ class GroupAggregateHarnessTest extends HarnessTestBase {
|""".stripMargin)
val testHarness = createHarnessTester[String, CRow, CRow](
- sqlQuery.toRetractStream[Row](queryConfig), "groupBy")
+ sqlQuery.toRetractStream[Row], "groupBy")
testHarness.setStateBackend(getStateBackend)
testHarness.open()
@@ -273,7 +279,7 @@ class GroupAggregateHarnessTest extends HarnessTestBase {
@Test
def testDistinctAggregateWithDifferentArgumentOrder(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = StreamTableEnvironment.create(env)
+ val tEnv = StreamTableEnvironment.create(env, tableConfig)
val data = new mutable.MutableList[(JLong, JLong, JLong)]
val t = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c)
@@ -292,7 +298,7 @@ class GroupAggregateHarnessTest extends HarnessTestBase {
|""".stripMargin)
val testHarness = createHarnessTester[String, CRow, CRow](
- sqlQuery.toRetractStream[Row](queryConfig), "groupBy")
+ sqlQuery.toRetractStream[Row], "groupBy")
testHarness.setStateBackend(getStateBackend)
testHarness.open()
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/TableAggregateHarnessTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/TableAggregateHarnessTest.scala
index 56d79d5..9519c3d 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/TableAggregateHarnessTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/TableAggregateHarnessTest.scala
@@ -17,33 +17,37 @@
*/
package org.apache.flink.table.runtime.harness
-import java.lang.{Integer => JInt}
-import java.util.concurrent.ConcurrentLinkedQueue
-
import org.apache.flink.api.common.time.Time
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.TableConfig
import org.apache.flink.table.api.scala._
-import org.apache.flink.table.runtime.harness.HarnessTestBase._
import org.apache.flink.table.runtime.types.CRow
import org.apache.flink.table.utils.{Top3WithEmitRetractValue, Top3WithMapView}
import org.apache.flink.types.Row
+
import org.junit.Test
+import java.lang.{Integer => JInt}
+import java.util.concurrent.ConcurrentLinkedQueue
+
import scala.collection.mutable
class TableAggregateHarnessTest extends HarnessTestBase {
- protected var queryConfig =
- new TestStreamQueryConfig(Time.seconds(2), Time.seconds(3))
+ private val tableConfig = new TableConfig {
+ override def getMinIdleStateRetentionTime: Long = Time.seconds(2).toMilliseconds
+
+ override def getMaxIdleStateRetentionTime: Long = Time.seconds(2).toMilliseconds
+ }
val data = new mutable.MutableList[(Int, Int)]
@Test
def testTableAggregate(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = StreamTableEnvironment.create(env)
+ val tEnv = StreamTableEnvironment.create(env, tableConfig)
val top3 = new Top3WithMapView
tEnv.registerFunction("top3", top3)
@@ -54,7 +58,7 @@ class TableAggregateHarnessTest extends HarnessTestBase {
.select('a, 'b1, 'b2)
val testHarness = createHarnessTester[Int, CRow, CRow](
- resultTable.toRetractStream[Row](queryConfig), "groupBy: (a)")
+ resultTable.toRetractStream[Row], "groupBy: (a)")
testHarness.open()
@@ -107,7 +111,7 @@ class TableAggregateHarnessTest extends HarnessTestBase {
def testTableAggregateEmitRetractValueIncrementally(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = StreamTableEnvironment.create(env)
+ val tEnv = StreamTableEnvironment.create(env, tableConfig)
val top3 = new Top3WithEmitRetractValue
val source = env.fromCollection(data).toTable(tEnv, 'a, 'b)
@@ -117,7 +121,7 @@ class TableAggregateHarnessTest extends HarnessTestBase {
.select('a, 'b1, 'b2)
val testHarness = createHarnessTester[Int, CRow, CRow](
- resultTable.toRetractStream[Row](queryConfig), "groupBy: (a)")
+ resultTable.toRetractStream[Row], "groupBy: (a)")
testHarness.open()
@@ -160,7 +164,7 @@ class TableAggregateHarnessTest extends HarnessTestBase {
def testTableAggregateWithRetractInput(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = StreamTableEnvironment.create(env)
+ val tEnv = StreamTableEnvironment.create(env, tableConfig)
val top3 = new Top3WithMapView
tEnv.registerFunction("top3", top3)
@@ -172,7 +176,7 @@ class TableAggregateHarnessTest extends HarnessTestBase {
.select('b1, 'b2)
val testHarness = createHarnessTester[Int, CRow, CRow](
- resultTable.toRetractStream[Row](queryConfig), "select: (Top3WithMapView")
+ resultTable.toRetractStream[Row], "select: (Top3WithMapView")
testHarness.open()
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
index 09bbe77..8f138de 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.utils
import java.util.Optional
+import org.apache.flink.api.common.JobExecutionResult
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.api.{QueryConfig, Table, TableConfig, TableEnvironment}
import org.apache.flink.table.catalog.{Catalog, ExternalCatalog}
@@ -69,8 +70,6 @@ class MockTableEnvironment extends TableEnvironment {
override def sqlUpdate(stmt: String): Unit = ???
- override def sqlUpdate(stmt: String, config: QueryConfig): Unit = ???
-
override def getConfig: TableConfig = ???
override def registerCatalog(
@@ -89,12 +88,8 @@ class MockTableEnvironment extends TableEnvironment {
override def insertInto(
table: Table,
- queryConfig: QueryConfig,
sinkPath: String,
sinkPathContinued: String*): Unit = ???
- override def insertInto(
- table: Table,
- sinkPath: String,
- sinkPathContinued: String*): Unit = ???
+ override def execute(jobName: String): JobExecutionResult = ???
}