You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2019/07/17 11:51:08 UTC
[flink] branch release-1.9 updated: [FLINK-13168][table] Clarify
isBatch/isStreaming/isBounded flag in flink planner and blink planner
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push:
new 8312dfa [FLINK-13168][table] Clarify isBatch/isStreaming/isBounded flag in flink planner and blink planner
8312dfa is described below
commit 8312dfa86d0b9670ec8b6caf0574cc0bc01c44e1
Author: godfrey he <go...@163.com>
AuthorDate: Wed Jul 17 19:49:35 2019 +0800
[FLINK-13168][table] Clarify isBatch/isStreaming/isBounded flag in flink planner and blink planner
---
.../java/internal/StreamTableEnvironmentImpl.java | 10 ++--
.../flink/table/api/EnvironmentSettings.java | 26 +++++-----
.../table/api/internal/TableEnvironmentImpl.java | 24 +++++----
.../flink/table/catalog/ConnectorCatalogTable.java | 2 +
.../operations/TableSourceQueryOperation.java | 1 +
.../operations/utils/OperationTreeBuilder.java | 8 +--
.../utils/factories/AggregateOperationFactory.java | 8 +--
.../utils/factories/SetOperationFactory.java | 8 +--
.../utils/factories/SortOperationFactory.java | 8 +--
.../factories/ComponentFactoryServiceTest.java | 4 +-
.../table/factories/utils/TestPlannerFactory.java | 2 +-
.../internal/StreamTableEnvironmentImpl.scala | 2 +-
.../flink/table/catalog/CatalogCalciteSchema.java | 7 ++-
.../table/catalog/CatalogManagerCalciteSchema.java | 7 ++-
.../flink/table/catalog/DatabaseCalciteSchema.java | 34 +++++++++----
.../flink/table/executor/BlinkExecutorFactory.java | 8 +--
.../table/operations/DataStreamQueryOperation.java | 2 +
.../operations/RichTableSourceQueryOperation.java | 13 +++--
.../flink/table/planner/BlinkPlannerFactory.java | 8 +--
.../PushFilterIntoTableSourceScanRule.scala | 2 +-
.../PushPartitionIntoTableSourceScanRule.scala | 2 +-
.../PushProjectIntoTableSourceScanRule.scala | 2 +-
.../table/plan/schema/TableSourceSinkTable.scala | 12 -----
.../flink/table/plan/schema/TableSourceTable.scala | 12 +++--
.../apache/flink/table/planner/BatchPlanner.scala | 2 +-
.../apache/flink/table/planner/PlannerBase.scala | 7 ++-
.../apache/flink/table/planner/StreamPlanner.scala | 2 +-
.../plan/nodes/resource/MockNodeTestBase.java | 26 +++++-----
.../parallelism/FinalParallelismSetterTest.java | 6 +--
.../parallelism/ShuffleStageGeneratorTest.java | 6 +--
.../flink/table/plan/batch/sql/TableSourceTest.xml | 16 ++++++
.../plan/nodes/resource/ExecNodeResourceTest.xml | 16 +++---
.../table/plan/stream/sql/TableSourceTest.xml | 32 ++++++++++++
.../flink/table/catalog/CatalogTableITCase.scala | 8 +--
.../flink/table/codegen/agg/AggTestBase.scala | 4 +-
.../codegen/agg/AggsHandlerCodeGeneratorTest.scala | 2 +-
.../table/codegen/agg/batch/BatchAggTestBase.scala | 2 +-
.../table/plan/batch/sql/TableSourceTest.scala | 42 ++++++++++++++--
.../plan/nodes/resource/ExecNodeResourceTest.scala | 18 +++----
.../table/plan/stream/sql/TableSourceTest.scala | 37 +++++++++++++-
.../apache/flink/table/util/TableTestBase.scala | 57 +++++++++++-----------
.../apache/flink/table/util/testTableSources.scala | 34 ++++++-------
.../flink/table/catalog/CatalogCalciteSchema.java | 8 +--
.../table/catalog/CatalogManagerCalciteSchema.java | 10 ++--
.../flink/table/catalog/DatabaseCalciteSchema.java | 8 +--
.../table/executor/StreamExecutorFactory.java | 2 +-
.../flink/table/planner/StreamPlannerFactory.java | 2 +-
.../flink/table/api/internal/TableEnvImpl.scala | 22 +++++----
.../table/catalog/ExternalCatalogSchema.scala | 11 +++--
.../flink/table/catalog/ExternalTableUtil.scala | 14 +++---
.../logical/FlinkLogicalTableSourceScan.scala | 2 +-
.../rules/dataSet/BatchTableSourceScanRule.scala | 2 +-
.../datastream/StreamTableSourceScanRule.scala | 2 +-
.../flink/table/plan/schema/TableSourceTable.scala | 8 ++-
.../apache/flink/table/planner/StreamPlanner.scala | 2 +-
.../table/catalog/DatabaseCalciteSchemaTest.java | 4 +-
.../table/catalog/ExternalCatalogSchemaTest.scala | 2 +-
57 files changed, 386 insertions(+), 242 deletions(-)
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
index 4d0586c..2e35bb9 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
@@ -87,13 +87,13 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple
StreamExecutionEnvironment executionEnvironment,
Planner planner,
Executor executor,
- boolean isStreaming) {
- super(catalogManager, tableConfig, executor, functionCatalog, planner, isStreaming);
+ boolean isStreamingMode) {
+ super(catalogManager, tableConfig, executor, functionCatalog, planner, isStreamingMode);
this.executionEnvironment = executionEnvironment;
- if (!isStreaming) {
+ if (!isStreamingMode) {
throw new TableException(
- "StreamTableEnvironment is not supported on batch mode now, please use TableEnvironment.");
+ "StreamTableEnvironment is not supported in batch mode now, please use TableEnvironment.");
}
}
@@ -122,7 +122,7 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple
executionEnvironment,
planner,
executor,
- !settings.isBatchMode()
+ settings.isStreamingMode()
);
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
index 4367254..e3fc7cc 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
@@ -44,7 +44,7 @@ import java.util.Map;
*/
@PublicEvolving
public class EnvironmentSettings {
- public static final String BATCH_MODE = "batch-mode";
+ public static final String STREAMING_MODE = "streaming-mode";
public static final String CLASS_NAME = "class-name";
/**
@@ -70,22 +70,22 @@ public class EnvironmentSettings {
private final String builtInDatabaseName;
/**
- * Determines if the table environment should work in a batch ({@code true}) or
- * streaming ({@code false}) mode.
+ * Determines if the table environment should work in a batch ({@code false}) or
+ * streaming ({@code true}) mode.
*/
- private final boolean isBatchMode;
+ private final boolean isStreamingMode;
private EnvironmentSettings(
@Nullable String plannerClass,
@Nullable String executorClass,
String builtInCatalogName,
String builtInDatabaseName,
- boolean isBatchMode) {
+ boolean isStreamingMode) {
this.plannerClass = plannerClass;
this.executorClass = executorClass;
this.builtInCatalogName = builtInCatalogName;
this.builtInDatabaseName = builtInDatabaseName;
- this.isBatchMode = isBatchMode;
+ this.isStreamingMode = isStreamingMode;
}
/**
@@ -117,8 +117,8 @@ public class EnvironmentSettings {
/**
* Tells if the {@link TableEnvironment} should work in a batch or streaming mode.
*/
- public boolean isBatchMode() {
- return isBatchMode;
+ public boolean isStreamingMode() {
+ return isStreamingMode;
}
@Internal
@@ -141,7 +141,7 @@ public class EnvironmentSettings {
private Map<String, String> toCommonProperties() {
Map<String, String> properties = new HashMap<>();
- properties.put(BATCH_MODE, Boolean.toString(isBatchMode));
+ properties.put(STREAMING_MODE, Boolean.toString(isStreamingMode));
return properties;
}
@@ -153,7 +153,7 @@ public class EnvironmentSettings {
private String executorClass = null;
private String builtInCatalogName = "default_catalog";
private String builtInDatabaseName = "default_database";
- private boolean isBatchMode = false;
+ private boolean isStreamingMode = true;
/**
* Sets the old Flink planner as the required module. By default, {@link #useAnyPlanner()} is
@@ -192,7 +192,7 @@ public class EnvironmentSettings {
* Sets that the components should work in a batch mode. Streaming mode by default.
*/
public Builder inBatchMode() {
- this.isBatchMode = true;
+ this.isStreamingMode = false;
return this;
}
@@ -200,7 +200,7 @@ public class EnvironmentSettings {
* Sets that the components should work in a streaming mode. Enabled by default.
*/
public Builder inStreamingMode() {
- this.isBatchMode = false;
+ this.isStreamingMode = true;
return this;
}
@@ -231,7 +231,7 @@ public class EnvironmentSettings {
executorClass,
builtInCatalogName,
builtInDatabaseName,
- isBatchMode);
+ isStreamingMode);
}
}
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index f2aea62..97d27f0 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -80,7 +80,9 @@ import java.util.stream.Collectors;
*/
@Internal
public class TableEnvironmentImpl implements TableEnvironment {
-
+ // Flag that tells if the TableSource/TableSink used in this environment is stream table source/sink,
+ // and this should always be true. This avoids too many hard code.
+ private static final boolean IS_STREAM_TABLE = true;
private final CatalogManager catalogManager;
private final String builtinCatalogName;
@@ -99,7 +101,7 @@ public class TableEnvironmentImpl implements TableEnvironment {
Executor executor,
FunctionCatalog functionCatalog,
Planner planner,
- boolean isStreaming) {
+ boolean isStreamingMode) {
this.catalogManager = catalogManager;
this.execEnv = executor;
@@ -117,7 +119,7 @@ public class TableEnvironmentImpl implements TableEnvironment {
Optional<CatalogQueryOperation> catalogTableOperation = scanInternal(path);
return catalogTableOperation.map(tableOperation -> new TableReferenceExpression(path, tableOperation));
},
- isStreaming
+ isStreamingMode
);
}
@@ -144,7 +146,7 @@ public class TableEnvironmentImpl implements TableEnvironment {
executor,
functionCatalog,
planner,
- !settings.isBatchMode()
+ settings.isStreamingMode()
);
}
@@ -155,7 +157,9 @@ public class TableEnvironmentImpl implements TableEnvironment {
@Override
public Table fromTableSource(TableSource<?> source) {
- return createTable(new TableSourceQueryOperation<>(source, false));
+ // only accept StreamTableSource and LookupableTableSource here
+ // TODO should add a validation, while StreamTableSource is in flink-table-api-java-bridge module now
+ return createTable(new TableSourceQueryOperation<>(source, !IS_STREAM_TABLE));
}
@Override
@@ -198,6 +202,8 @@ public class TableEnvironmentImpl implements TableEnvironment {
@Override
public void registerTableSource(String name, TableSource<?> tableSource) {
+ // only accept StreamTableSource and LookupableTableSource here
+ // TODO should add a validation, while StreamTableSource is in flink-table-api-java-bridge module now
registerTableSourceInternal(name, tableSource);
}
@@ -528,14 +534,14 @@ public class TableEnvironmentImpl implements TableEnvironment {
replaceTableInternal(
name,
ConnectorCatalogTable
- .sourceAndSink(tableSource, sourceSinkTable.getTableSink().get(), false));
+ .sourceAndSink(tableSource, sourceSinkTable.getTableSink().get(), !IS_STREAM_TABLE));
}
} else {
throw new ValidationException(String.format(
"Table '%s' already exists. Please choose a different name.", name));
}
} else {
- registerTableInternal(name, ConnectorCatalogTable.source(tableSource, false));
+ registerTableInternal(name, ConnectorCatalogTable.source(tableSource, !IS_STREAM_TABLE));
}
}
@@ -553,14 +559,14 @@ public class TableEnvironmentImpl implements TableEnvironment {
replaceTableInternal(
name,
ConnectorCatalogTable
- .sourceAndSink(sourceSinkTable.getTableSource().get(), tableSink, false));
+ .sourceAndSink(sourceSinkTable.getTableSource().get(), tableSink, !IS_STREAM_TABLE));
}
} else {
throw new ValidationException(String.format(
"Table '%s' already exists. Please choose a different name.", name));
}
} else {
- registerTableInternal(name, ConnectorCatalogTable.sink(tableSink, false));
+ registerTableInternal(name, ConnectorCatalogTable.sink(tableSink, !IS_STREAM_TABLE));
}
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ConnectorCatalogTable.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ConnectorCatalogTable.java
index 84b6e0d..a3f7bda 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ConnectorCatalogTable.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ConnectorCatalogTable.java
@@ -49,6 +49,8 @@ import java.util.stream.Collectors;
public class ConnectorCatalogTable<T1, T2> extends AbstractCatalogTable {
private final TableSource<T1> tableSource;
private final TableSink<T2> tableSink;
+ // Flag that tells if the tableSource/tableSink is BatchTableSource/BatchTableSink.
+ // NOTES: this should be false in BLINK planner, because BLINK planner always uses StreamTableSource.
private final boolean isBatch;
public static <T1> ConnectorCatalogTable source(TableSource<T1> source, boolean isBatch) {
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/TableSourceQueryOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/TableSourceQueryOperation.java
index 097e92e..1644d0b 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/TableSourceQueryOperation.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/TableSourceQueryOperation.java
@@ -35,6 +35,7 @@ import java.util.Map;
public class TableSourceQueryOperation<T> implements QueryOperation {
private final TableSource<T> tableSource;
+ // Flag that tells if the tableSource is BatchTableSource.
private final boolean isBatch;
public TableSourceQueryOperation(TableSource<T> tableSource, boolean isBatch) {
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/OperationTreeBuilder.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/OperationTreeBuilder.java
index 7510cb9..a27aeba 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/OperationTreeBuilder.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/OperationTreeBuilder.java
@@ -122,15 +122,15 @@ public final class OperationTreeBuilder {
public static OperationTreeBuilder create(
FunctionLookup functionCatalog,
TableReferenceLookup tableReferenceLookup,
- boolean isStreaming) {
+ boolean isStreamingMode) {
return new OperationTreeBuilder(
functionCatalog,
tableReferenceLookup,
new ProjectionOperationFactory(),
- new SortOperationFactory(isStreaming),
+ new SortOperationFactory(isStreamingMode),
new CalculatedTableFactory(),
- new SetOperationFactory(isStreaming),
- new AggregateOperationFactory(isStreaming),
+ new SetOperationFactory(isStreamingMode),
+ new AggregateOperationFactory(isStreamingMode),
new JoinOperationFactory()
);
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/AggregateOperationFactory.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/AggregateOperationFactory.java
index 42c0f03..899bce5 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/AggregateOperationFactory.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/AggregateOperationFactory.java
@@ -89,14 +89,14 @@ import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isTim
@Internal
public final class AggregateOperationFactory {
- private final boolean isStreaming;
+ private final boolean isStreamingMode;
private final NoNestedAggregates noNestedAggregates = new NoNestedAggregates();
private final ValidateDistinct validateDistinct = new ValidateDistinct();
private final AggregationExpressionValidator aggregationsValidator = new AggregationExpressionValidator();
private final IsKeyTypeChecker isKeyTypeChecker = new IsKeyTypeChecker();
- public AggregateOperationFactory(boolean isStreaming) {
- this.isStreaming = isStreaming;
+ public AggregateOperationFactory(boolean isStreamingMode) {
+ this.isStreamingMode = isStreamingMode;
}
/**
@@ -274,7 +274,7 @@ public final class AggregateOperationFactory {
}
private void validateTimeAttributeType(LogicalType timeFieldType) {
- if (isStreaming) {
+ if (isStreamingMode) {
validateStreamTimeAttribute(timeFieldType);
} else {
validateBatchTimeAttribute(timeFieldType);
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/SetOperationFactory.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/SetOperationFactory.java
index 9b54b78..0d206f9 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/SetOperationFactory.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/SetOperationFactory.java
@@ -37,10 +37,10 @@ import static org.apache.flink.table.operations.SetQueryOperation.SetQueryOperat
@Internal
public class SetOperationFactory {
- private final boolean isStreaming;
+ private final boolean isStreamingMode;
- public SetOperationFactory(boolean isStreaming) {
- this.isStreaming = isStreaming;
+ public SetOperationFactory(boolean isStreamingMode) {
+ this.isStreamingMode = isStreamingMode;
}
/**
@@ -98,7 +98,7 @@ public class SetOperationFactory {
private void failIfStreaming(SetQueryOperationType type, boolean all) {
boolean shouldFailInCaseOfStreaming = !all || type != UNION;
- if (isStreaming && shouldFailInCaseOfStreaming) {
+ if (isStreamingMode && shouldFailInCaseOfStreaming) {
throw new ValidationException(
format(
"The %s operation on two unbounded tables is currently not supported.",
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/SortOperationFactory.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/SortOperationFactory.java
index 765fe4f..a58db14 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/SortOperationFactory.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/SortOperationFactory.java
@@ -40,10 +40,10 @@ import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ORDER_
@Internal
public class SortOperationFactory {
- private final boolean isStreaming;
+ private final boolean isStreamingMode;
- public SortOperationFactory(boolean isStreaming) {
- this.isStreaming = isStreaming;
+ public SortOperationFactory(boolean isStreamingMode) {
+ this.isStreamingMode = isStreamingMode;
}
/**
@@ -127,7 +127,7 @@ public class SortOperationFactory {
}
private void failIfStreaming() {
- if (isStreaming) {
+ if (isStreamingMode) {
throw new ValidationException("A limit operation on unbounded tables is currently not supported.");
}
}
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/ComponentFactoryServiceTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/ComponentFactoryServiceTest.java
index 13e821d..e6befb5 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/ComponentFactoryServiceTest.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/ComponentFactoryServiceTest.java
@@ -45,7 +45,7 @@ public class ComponentFactoryServiceTest {
public void testLookingUpAmbiguousPlanners() {
Map<String, String> properties = new HashMap<>();
properties.put(EnvironmentSettings.CLASS_NAME, TestPlannerFactory.class.getCanonicalName());
- properties.put(EnvironmentSettings.BATCH_MODE, Boolean.toString(true));
+ properties.put(EnvironmentSettings.STREAMING_MODE, Boolean.toString(false));
properties.put(TestPlannerFactory.PLANNER_TYPE_KEY, TestPlannerFactory.PLANNER_TYPE_VALUE);
PlannerFactory plannerFactory = ComponentFactoryService.find(PlannerFactory.class, properties);
@@ -60,7 +60,7 @@ public class ComponentFactoryServiceTest {
Map<String, String> properties = new HashMap<>();
properties.put(EnvironmentSettings.CLASS_NAME, "NoSuchClass");
- properties.put(EnvironmentSettings.BATCH_MODE, Boolean.toString(true));
+ properties.put(EnvironmentSettings.STREAMING_MODE, Boolean.toString(false));
properties.put(TestPlannerFactory.PLANNER_TYPE_KEY, TestPlannerFactory.PLANNER_TYPE_VALUE);
ComponentFactoryService.find(PlannerFactory.class, properties);
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/utils/TestPlannerFactory.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/utils/TestPlannerFactory.java
index 1a0f05f..6ace0df 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/utils/TestPlannerFactory.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/utils/TestPlannerFactory.java
@@ -64,6 +64,6 @@ public class TestPlannerFactory implements PlannerFactory {
@Override
public List<String> supportedProperties() {
- return Arrays.asList(EnvironmentSettings.CLASS_NAME, EnvironmentSettings.BATCH_MODE);
+ return Arrays.asList(EnvironmentSettings.CLASS_NAME, EnvironmentSettings.STREAMING_MODE);
}
}
diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
index 1304c8c..67021cb 100644
--- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
+++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
@@ -300,7 +300,7 @@ object StreamTableEnvironmentImpl {
executionEnvironment,
planner,
executor,
- !settings.isBatchMode
+ settings.isStreamingMode
)
}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/CatalogCalciteSchema.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/CatalogCalciteSchema.java
index e816c8f..678eca3 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/CatalogCalciteSchema.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/CatalogCalciteSchema.java
@@ -39,10 +39,13 @@ public class CatalogCalciteSchema extends FlinkSchema {
private final String catalogName;
private final Catalog catalog;
+ // Flag that tells if the current planner should work in a batch or streaming mode.
+ private final boolean isStreamingMode;
- public CatalogCalciteSchema(String catalogName, Catalog catalog) {
+ public CatalogCalciteSchema(String catalogName, Catalog catalog, boolean isStreamingMode) {
this.catalogName = catalogName;
this.catalog = catalog;
+ this.isStreamingMode = isStreamingMode;
}
/**
@@ -54,7 +57,7 @@ public class CatalogCalciteSchema extends FlinkSchema {
@Override
public Schema getSubSchema(String schemaName) {
if (catalog.databaseExists(schemaName)) {
- return new DatabaseCalciteSchema(schemaName, catalogName, catalog);
+ return new DatabaseCalciteSchema(schemaName, catalogName, catalog, isStreamingMode);
} else {
return null;
}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/CatalogManagerCalciteSchema.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/CatalogManagerCalciteSchema.java
index c7238eb..7ca0a41 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/CatalogManagerCalciteSchema.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/CatalogManagerCalciteSchema.java
@@ -43,9 +43,12 @@ import java.util.Set;
public class CatalogManagerCalciteSchema extends FlinkSchema {
private final CatalogManager catalogManager;
+ // Flag that tells if the current planner should work in a batch or streaming mode.
+ private final boolean isStreamingMode;
- public CatalogManagerCalciteSchema(CatalogManager catalogManager) {
+ public CatalogManagerCalciteSchema(CatalogManager catalogManager, boolean isStreamingMode) {
this.catalogManager = catalogManager;
+ this.isStreamingMode = isStreamingMode;
}
@Override
@@ -61,7 +64,7 @@ public class CatalogManagerCalciteSchema extends FlinkSchema {
@Override
public Schema getSubSchema(String name) {
Schema schema = catalogManager.getCatalog(name)
- .map(catalog -> new CatalogCalciteSchema(name, catalog))
+ .map(catalog -> new CatalogCalciteSchema(name, catalog, isStreamingMode))
.orElse(null);
if (schema == null && catalogManager.getExternalCatalog(name).isPresent()) {
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
index 0ed45ba..cc90916 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
@@ -30,6 +30,7 @@ import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.table.operations.RichTableSourceQueryOperation;
import org.apache.flink.table.plan.schema.TableSourceTable;
import org.apache.flink.table.plan.stats.FlinkStatistic;
+import org.apache.flink.table.sources.LookupableTableSource;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.table.sources.TableSource;
@@ -55,11 +56,14 @@ class DatabaseCalciteSchema extends FlinkSchema {
private final String databaseName;
private final String catalogName;
private final Catalog catalog;
+ // Flag that tells if the current planner should work in a batch or streaming mode.
+ private final boolean isStreamingMode;
- public DatabaseCalciteSchema(String databaseName, String catalogName, Catalog catalog) {
+ public DatabaseCalciteSchema(String databaseName, String catalogName, Catalog catalog, boolean isStreamingMode) {
this.databaseName = databaseName;
this.catalogName = catalogName;
this.catalog = catalog;
+ this.isStreamingMode = isStreamingMode;
}
@Override
@@ -96,20 +100,30 @@ class DatabaseCalciteSchema extends FlinkSchema {
// TableNotExistException should never happen, because we are checking it exists
// via catalog.tableExists
throw new TableException(format(
- "A failure occurred when accessing table. Table path [%s, %s, %s]",
- catalogName,
- databaseName,
- tableName), e);
+ "A failure occurred when accessing table. Table path [%s, %s, %s]",
+ catalogName,
+ databaseName,
+ tableName), e);
}
}
private Table convertConnectorTable(ConnectorCatalogTable<?, ?> table) {
return table.getTableSource()
- .map(tableSource -> new TableSourceTable<>(
- tableSource,
- !table.isBatch(),
- FlinkStatistic.UNKNOWN()))
- .orElseThrow(() -> new TableException("Cannot query a sink only table."));
+ .map(tableSource -> {
+ if (!(tableSource instanceof StreamTableSource ||
+ tableSource instanceof LookupableTableSource)) {
+ throw new TableException(
+ "Only StreamTableSource and LookupableTableSource can be used in Blink planner.");
+ }
+ if (!isStreamingMode && tableSource instanceof StreamTableSource &&
+ !((StreamTableSource<?>) tableSource).isBounded()) {
+ throw new TableException("Only bounded StreamTableSource can be used in batch mode.");
+ }
+ return new TableSourceTable<>(
+ tableSource,
+ isStreamingMode,
+ FlinkStatistic.UNKNOWN());
+ }).orElseThrow(() -> new TableException("Cannot query a sink only table."));
}
private Table convertCatalogTable(ObjectPath tablePath, CatalogTable table) {
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BlinkExecutorFactory.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BlinkExecutorFactory.java
index ac16cc8..855aa4c 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BlinkExecutorFactory.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BlinkExecutorFactory.java
@@ -49,10 +49,10 @@ public class BlinkExecutorFactory implements ExecutorFactory {
* @return instance of a {@link Executor}
*/
public Executor create(Map<String, String> properties, StreamExecutionEnvironment executionEnvironment) {
- if (Boolean.valueOf(properties.getOrDefault(EnvironmentSettings.BATCH_MODE, "false"))) {
- return new BatchExecutor(executionEnvironment);
- } else {
+ if (Boolean.valueOf(properties.getOrDefault(EnvironmentSettings.STREAMING_MODE, "true"))) {
return new StreamExecutor(executionEnvironment);
+ } else {
+ return new BatchExecutor(executionEnvironment);
}
}
@@ -69,7 +69,7 @@ public class BlinkExecutorFactory implements ExecutorFactory {
@Override
public List<String> supportedProperties() {
- return Arrays.asList(EnvironmentSettings.BATCH_MODE, EnvironmentSettings.CLASS_NAME);
+ return Arrays.asList(EnvironmentSettings.STREAMING_MODE, EnvironmentSettings.CLASS_NAME);
}
@Override
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/operations/DataStreamQueryOperation.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/operations/DataStreamQueryOperation.java
index 7b1d7d2..01c0400 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/operations/DataStreamQueryOperation.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/operations/DataStreamQueryOperation.java
@@ -31,6 +31,8 @@ import java.util.Map;
/**
* Describes a relational operation that reads from a {@link DataStream}.
*
+ * <p>This is only used for testing.
+ *
* <p>This operation may expose only part, or change the order of the fields available in a
* {@link org.apache.flink.api.common.typeutils.CompositeType} of the underlying {@link DataStream}.
* The {@link DataStreamQueryOperation#getFieldIndices()} describes the mapping between fields of the
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/operations/RichTableSourceQueryOperation.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/operations/RichTableSourceQueryOperation.java
index 62e2a42..d572d73 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/operations/RichTableSourceQueryOperation.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/operations/RichTableSourceQueryOperation.java
@@ -20,7 +20,9 @@ package org.apache.flink.table.operations;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.plan.stats.FlinkStatistic;
+import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.util.Preconditions;
import java.util.HashMap;
import java.util.List;
@@ -30,6 +32,8 @@ import java.util.Map;
* A {@link TableSourceQueryOperation} with {@link FlinkStatistic} and qualifiedName.
* TODO this class should be deleted after unique key in TableSchema is ready
* and setting catalog statistic to TableSourceTable in DatabaseCalciteSchema is ready
+ *
+ * <p>This is only used for testing.
*/
@Internal
public class RichTableSourceQueryOperation<T> extends TableSourceQueryOperation<T> {
@@ -37,10 +41,11 @@ public class RichTableSourceQueryOperation<T> extends TableSourceQueryOperation<
private List<String> qualifiedName;
public RichTableSourceQueryOperation(
- TableSource<T> tableSource,
- boolean isBatch,
- FlinkStatistic statistic) {
- super(tableSource, isBatch);
+ TableSource<T> tableSource,
+ FlinkStatistic statistic) {
+ super(tableSource, false);
+ Preconditions.checkArgument(tableSource instanceof StreamTableSource,
+ "Blink planner should always use StreamTableSource.");
this.statistic = statistic;
}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/BlinkPlannerFactory.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/BlinkPlannerFactory.java
index 8b967d7..2d353df 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/BlinkPlannerFactory.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/BlinkPlannerFactory.java
@@ -46,10 +46,10 @@ public final class BlinkPlannerFactory implements PlannerFactory {
TableConfig tableConfig,
FunctionCatalog functionCatalog,
CatalogManager catalogManager) {
- if (Boolean.valueOf(properties.getOrDefault(EnvironmentSettings.BATCH_MODE, "false"))) {
- return new BatchPlanner(executor, tableConfig, functionCatalog, catalogManager);
- } else {
+ if (Boolean.valueOf(properties.getOrDefault(EnvironmentSettings.STREAMING_MODE, "true"))) {
return new StreamPlanner(executor, tableConfig, functionCatalog, catalogManager);
+ } else {
+ return new BatchPlanner(executor, tableConfig, functionCatalog, catalogManager);
}
}
@@ -67,6 +67,6 @@ public final class BlinkPlannerFactory implements PlannerFactory {
@Override
public List<String> supportedProperties() {
- return Arrays.asList(EnvironmentSettings.BATCH_MODE, EnvironmentSettings.CLASS_NAME);
+ return Arrays.asList(EnvironmentSettings.STREAMING_MODE, EnvironmentSettings.CLASS_NAME);
}
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala
index 90d35d6..14f173d 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala
@@ -139,7 +139,7 @@ class PushFilterIntoTableSourceScanRule extends RelOptRule(
FlinkStatistic.builder().statistic(statistic).tableStats(null).build()
}
val newTableSourceTable = new TableSourceTable(
- newTableSource, tableSourceTable.isStreaming, newStatistic)
+ newTableSource, tableSourceTable.isStreamingMode, newStatistic)
relOptTable.copy(newTableSourceTable, tableSourceTable.getRowType(typeFactory))
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/PushPartitionIntoTableSourceScanRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/PushPartitionIntoTableSourceScanRule.scala
index cd7fd7b..9c9c400 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/PushPartitionIntoTableSourceScanRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/PushPartitionIntoTableSourceScanRule.scala
@@ -125,7 +125,7 @@ class PushPartitionIntoTableSourceScanRule extends RelOptRule(
FlinkStatistic.builder().statistic(statistic).tableStats(null).build()
}
val newTableSourceTable = new TableSourceTable(
- newTableSource, tableSourceTable.isStreaming, newStatistic)
+ newTableSource, tableSourceTable.isStreamingMode, newStatistic)
val newRelOptTable = relOptTable.copy(newTableSourceTable, relOptTable.getRowType)
val newScan = new LogicalTableScan(scan.getCluster, scan.getTraitSet, newRelOptTable)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/PushProjectIntoTableSourceScanRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/PushProjectIntoTableSourceScanRule.scala
index 6176872..7518844 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/PushProjectIntoTableSourceScanRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/PushProjectIntoTableSourceScanRule.scala
@@ -76,7 +76,7 @@ class PushProjectIntoTableSourceScanRule extends RelOptRule(
}
// project push down does not change the statistic, we can reuse origin statistic
val newTableSourceTable = new TableSourceTable(
- newTableSource, tableSourceTable.isStreaming, tableSourceTable.statistic)
+ newTableSource, tableSourceTable.isStreamingMode, tableSourceTable.statistic)
// row type is changed after project push down
val newRowType = newTableSourceTable.getRowType(scan.getCluster.getTypeFactory)
val newRelOptTable = relOptTable.copy(newTableSourceTable, newRowType)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TableSourceSinkTable.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TableSourceSinkTable.scala
index c421d78..90cb825 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TableSourceSinkTable.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TableSourceSinkTable.scala
@@ -53,18 +53,6 @@ class TableSourceSinkTable[T1, T2](
.getOrElse(throw new TableException("Unable to get statistics of table source sink table."))
}
- def isSourceTable: Boolean = tableSourceTable.isDefined
-
- def isStreamSourceTable: Boolean = tableSourceTable match {
- case Some(tst) => tst.isStreaming
- case _ => false
- }
-
- def isBatchSourceTable: Boolean = tableSourceTable match {
- case Some(tst) => !tst.isStreaming
- case _ => false
- }
-
override def copy(statistic: FlinkStatistic): FlinkTable = {
new TableSourceSinkTable[T1, T2](
tableSourceTable.map(source => source.copy(statistic)),
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
index ce4e722..16da7c3 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
@@ -26,10 +26,14 @@ import org.apache.flink.table.sources.{TableSource, TableSourceUtil}
/**
* Abstract class which define the interfaces required to convert a [[TableSource]] to
* a Calcite Table
+ *
+ * @param tableSource The [[TableSource]] for which is converted to a Calcite Table.
+ * @param isStreamingMode A flag that tells if the current table is in stream mode.
+ * @param statistic The table statistics.
*/
class TableSourceTable[T](
val tableSource: TableSource[T],
- val isStreaming: Boolean,
+ val isStreamingMode: Boolean,
val statistic: FlinkStatistic)
extends FlinkTable {
@@ -40,7 +44,7 @@ class TableSourceTable[T](
TableSourceUtil.getRelDataType(
tableSource,
None,
- streaming = isStreaming,
+ streaming = isStreamingMode,
typeFactory.asInstanceOf[FlinkTypeFactory])
}
@@ -51,7 +55,7 @@ class TableSourceTable[T](
* @return Copy of this table, substituting statistic.
*/
override def copy(statistic: FlinkStatistic): TableSourceTable[T] = {
- new TableSourceTable(tableSource, isStreaming, statistic)
+ new TableSourceTable(tableSource, isStreamingMode, statistic)
}
/**
@@ -66,6 +70,6 @@ class TableSourceTable[T](
* @return new TableSourceTable
*/
def replaceTableSource(tableSource: TableSource[T]): TableSourceTable[T] = {
- new TableSourceTable[T](tableSource, isStreaming, statistic)
+ new TableSourceTable[T](tableSource, isStreamingMode, statistic)
}
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/BatchPlanner.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/BatchPlanner.scala
index 9b4e4f2..b2dedbd 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/BatchPlanner.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/BatchPlanner.scala
@@ -46,7 +46,7 @@ class BatchPlanner(
config: TableConfig,
functionCatalog: FunctionCatalog,
catalogManager: CatalogManager)
- extends PlannerBase(executor, config, functionCatalog, catalogManager) {
+ extends PlannerBase(executor, config, functionCatalog, catalogManager, isStreamingMode = false) {
override protected def getTraitDefs: Array[RelTraitDef[_ <: RelTrait]] = {
Array(
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/PlannerBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/PlannerBase.scala
index 021a650..ddb759d 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/PlannerBase.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/PlannerBase.scala
@@ -62,12 +62,15 @@ import _root_.scala.collection.JavaConversions._
* @param config mutable configuration passed from corresponding [[TableEnvironment]]
* @param functionCatalog catalog of functions
* @param catalogManager manager of catalog meta objects such as tables, views, databases etc.
+ * @param isStreamingMode Determines if the planner should work in a batch (false}) or
+ * streaming (true) mode.
*/
abstract class PlannerBase(
executor: Executor,
config: TableConfig,
val functionCatalog: FunctionCatalog,
- catalogManager: CatalogManager)
+ catalogManager: CatalogManager,
+ isStreamingMode: Boolean)
extends Planner {
// temporary utility until we don't use planner expressions anymore
@@ -79,7 +82,7 @@ abstract class PlannerBase(
new PlannerContext(
config,
functionCatalog,
- asRootSchema(new CatalogManagerCalciteSchema(catalogManager)),
+ asRootSchema(new CatalogManagerCalciteSchema(catalogManager, isStreamingMode)),
getTraitDefs.toList
)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
index 3007d92..b64416d 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
@@ -45,7 +45,7 @@ class StreamPlanner(
config: TableConfig,
functionCatalog: FunctionCatalog,
catalogManager: CatalogManager)
- extends PlannerBase(executor, config, functionCatalog, catalogManager) {
+ extends PlannerBase(executor, config, functionCatalog, catalogManager, isStreamingMode = true) {
override protected def getTraitDefs: Array[RelTraitDef[_ <: RelTrait]] = {
Array(
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/plan/nodes/resource/MockNodeTestBase.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/plan/nodes/resource/MockNodeTestBase.java
index 8637ed6..4bfbc12 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/plan/nodes/resource/MockNodeTestBase.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/plan/nodes/resource/MockNodeTestBase.java
@@ -53,10 +53,10 @@ import static org.mockito.Mockito.when;
public class MockNodeTestBase {
protected List<ExecNode> nodeList;
- private final boolean isBatch;
+ private final boolean isBatchMode;
- public MockNodeTestBase(boolean isBatch) {
- this.isBatch = isBatch;
+ public MockNodeTestBase(boolean isBatchMode) {
+ this.isBatchMode = isBatchMode;
}
private void updateNode(int index, ExecNode<?, ?> node) {
@@ -90,25 +90,25 @@ public class MockNodeTestBase {
}
protected ExecNode<?, ?> updateCalc(int index) {
- ExecNode<?, ?> node = isBatch ? mock(BatchExecCalc.class) : mock(StreamExecCalc.class);
+ ExecNode<?, ?> node = isBatchMode ? mock(BatchExecCalc.class) : mock(StreamExecCalc.class);
updateNode(index, node);
return node;
}
protected ExecNode<?, ?> updateValues(int index) {
- ExecNode<?, ?> node = isBatch ? mock(BatchExecValues.class) : mock(StreamExecValues.class);
+ ExecNode<?, ?> node = isBatchMode ? mock(BatchExecValues.class) : mock(StreamExecValues.class);
updateNode(index, node);
return node;
}
protected ExecNode<?, ?> updateUnion(int index) {
- ExecNode<?, ?> node = isBatch ? mock(BatchExecUnion.class) : mock(StreamExecUnion.class);
+ ExecNode<?, ?> node = isBatchMode ? mock(BatchExecUnion.class) : mock(StreamExecUnion.class);
updateNode(index, node);
return node;
}
protected ExecNode<?, ?> updateExchange(int index) {
- ExecNode<?, ?> node = isBatch ? mock(BatchExecExchange.class, RETURNS_DEEP_STUBS) :
+ ExecNode<?, ?> node = isBatchMode ? mock(BatchExecExchange.class, RETURNS_DEEP_STUBS) :
mock(StreamExecExchange.class, RETURNS_DEEP_STUBS);
updateNode(index, node);
return node;
@@ -116,7 +116,7 @@ public class MockNodeTestBase {
protected ExecNode<?, ?> updateExchange(int index, RelDistribution.Type type) {
ExecNode<?, ?> node = updateExchange(index);
- if (isBatch) {
+ if (isBatchMode) {
when(((BatchExecExchange) node).getDistribution().getType()).thenReturn(type);
} else {
when(((StreamExecExchange) node).getDistribution().getType()).thenReturn(type);
@@ -125,14 +125,14 @@ public class MockNodeTestBase {
}
protected ExecNode<?, ?> updateTableSource(int index) {
- ExecNode<?, ?> node = isBatch ? mock(BatchExecTableSourceScan.class) : mock(StreamExecTableSourceScan.class);
+ ExecNode<?, ?> node = isBatchMode ? mock(BatchExecTableSourceScan.class) : mock(StreamExecTableSourceScan.class);
updateNode(index, node);
return node;
}
protected ExecNode<?, ?> updateTableSource(int index, int maxParallelism) {
ExecNode<?, ?> node = updateTableSource(index);
- if (isBatch) {
+ if (isBatchMode) {
when(((BatchExecTableSourceScan) node).getSourceTransformation(any()).getMaxParallelism()).thenReturn(maxParallelism);
} else {
when(((StreamExecTableSourceScan) node).getSourceTransformation(any()).getMaxParallelism()).thenReturn(maxParallelism);
@@ -141,14 +141,14 @@ public class MockNodeTestBase {
}
protected ExecNode<?, ?> updateStreamScan(int index) {
- ExecNode<?, ?> node = isBatch ? mock(BatchExecBoundedStreamScan.class) : mock(StreamExecDataStreamScan.class);
+ ExecNode<?, ?> node = isBatchMode ? mock(BatchExecBoundedStreamScan.class) : mock(StreamExecDataStreamScan.class);
updateNode(index, node);
return node;
}
protected ExecNode<?, ?> updateStreamScan(int index, int parallelism) {
ExecNode<?, ?> node = updateStreamScan(index);
- if (isBatch) {
+ if (isBatchMode) {
when(((BatchExecBoundedStreamScan) nodeList.get(4)).getSourceTransformation().getParallelism()).thenReturn(parallelism);
} else {
when(((StreamExecDataStreamScan) nodeList.get(4)).getSourceTransformation().getParallelism()).thenReturn(parallelism);
@@ -159,7 +159,7 @@ public class MockNodeTestBase {
protected void createNodeList(int num) {
nodeList = new LinkedList<>();
for (int i = 0; i < num; i++) {
- ExecNode<?, ?> node = isBatch ? mock(BatchExecCalc.class) : mock(StreamExecCalc.class);
+ ExecNode<?, ?> node = isBatchMode ? mock(BatchExecCalc.class) : mock(StreamExecCalc.class);
when(node.getInputNodes()).thenReturn(new ArrayList<>());
when(node.getResource()).thenReturn(new NodeResource());
when(node.toString()).thenReturn("id: " + i);
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/plan/nodes/resource/parallelism/FinalParallelismSetterTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/plan/nodes/resource/parallelism/FinalParallelismSetterTest.java
index 1254a5c..e9f2234 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/plan/nodes/resource/parallelism/FinalParallelismSetterTest.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/plan/nodes/resource/parallelism/FinalParallelismSetterTest.java
@@ -44,8 +44,8 @@ public class FinalParallelismSetterTest extends MockNodeTestBase {
private StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
- public FinalParallelismSetterTest(boolean isBatch) {
- super(isBatch);
+ public FinalParallelismSetterTest(boolean isBatchMode) {
+ super(isBatchMode);
}
@Before
@@ -108,7 +108,7 @@ public class FinalParallelismSetterTest extends MockNodeTestBase {
assertEquals(1, finalParallelismNodeMap.get(nodeList.get(7)).intValue());
}
- @Parameterized.Parameters(name = "isBatch = {0}")
+ @Parameterized.Parameters(name = "isBatchMode = {0}")
public static Collection<Object[]> runMode() {
return Arrays.asList(
new Object[] { false, },
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/plan/nodes/resource/parallelism/ShuffleStageGeneratorTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/plan/nodes/resource/parallelism/ShuffleStageGeneratorTest.java
index 17c7ad1..e0697bc 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/plan/nodes/resource/parallelism/ShuffleStageGeneratorTest.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/plan/nodes/resource/parallelism/ShuffleStageGeneratorTest.java
@@ -45,8 +45,8 @@ public class ShuffleStageGeneratorTest extends MockNodeTestBase {
private Map<ExecNode<?, ?>, Integer> finalParallelismNodeMap;
- public ShuffleStageGeneratorTest(boolean isBatch) {
- super(isBatch);
+ public ShuffleStageGeneratorTest(boolean isBatchMode) {
+ super(isBatchMode);
}
@Before
@@ -313,7 +313,7 @@ public class ShuffleStageGeneratorTest extends MockNodeTestBase {
}
}
- @Parameterized.Parameters(name = "isBatch = {0}")
+ @Parameterized.Parameters(name = "isBatchMode = {0}")
public static Collection<Object[]> runMode() {
return Arrays.asList(
new Object[] { false, },
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/TableSourceTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/TableSourceTest.xml
index 4d9c790..190f1c6 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/TableSourceTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/TableSourceTest.xml
@@ -16,6 +16,22 @@ See the License for the specific language governing permissions and
limitations under the License.
-->
<Root>
+ <TestCase name="testBoundedStreamTableSource">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
<TestCase name="testFilterCannotPushDown">
<Resource name="sql">
<![CDATA[SELECT * FROM FilterableTable WHERE price > 10]]>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/nodes/resource/ExecNodeResourceTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/nodes/resource/ExecNodeResourceTest.xml
index 8c0a337..c692020 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/nodes/resource/ExecNodeResourceTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/nodes/resource/ExecNodeResourceTest.xml
@@ -16,7 +16,7 @@ See the License for the specific language governing permissions and
limitations under the License.
-->
<Root>
- <TestCase name="testConfigSourceParallelism[isBatch=false]">
+ <TestCase name="testConfigSourceParallelism[isBatchMode=false]">
<Resource name="sql">
<![CDATA[SELECT sum(a) as sum_a, c FROM table3 group by c order by c limit 2]]>
</Resource>
@@ -32,7 +32,7 @@ Calc(select=[sum_a, c], resource=[{parallelism=1}])
]]>
</Resource>
</TestCase>
- <TestCase name="testConfigSourceParallelism[isBatch=true]">
+ <TestCase name="testConfigSourceParallelism[isBatchMode=true]">
<Resource name="sql">
<![CDATA[SELECT sum(a) as sum_a, c FROM table3 group by c order by c limit 2]]>
</Resource>
@@ -50,7 +50,7 @@ Calc(select=[sum_a, c], resource=[{parallelism=1}])
]]>
</Resource>
</TestCase>
- <TestCase name="testSortLimit[isBatch=false]">
+ <TestCase name="testSortLimit[isBatchMode=false]">
<Resource name="sql">
<![CDATA[SELECT sum(a) as sum_a, c FROM table3 group by c order by c limit 2]]>
</Resource>
@@ -66,7 +66,7 @@ Calc(select=[sum_a, c], resource=[{parallelism=1}])
]]>
</Resource>
</TestCase>
- <TestCase name="testSourcePartitionMaxNum[isBatch=true]">
+ <TestCase name="testSourcePartitionMaxNum[isBatchMode=true]">
<Resource name="sql">
<![CDATA[SELECT * FROM table3]]>
</Resource>
@@ -76,7 +76,7 @@ TableSourceScan(table=[[default_catalog, default_database, table3, source: [Mock
]]>
</Resource>
</TestCase>
- <TestCase name="testSortLimit[isBatch=true]">
+ <TestCase name="testSortLimit[isBatchMode=true]">
<Resource name="sql">
<![CDATA[SELECT sum(a) as sum_a, c FROM table3 group by c order by c limit 2]]>
</Resource>
@@ -94,7 +94,7 @@ Calc(select=[sum_a, c], resource=[{parallelism=1}])
]]>
</Resource>
</TestCase>
- <TestCase name="testUnionQuery[isBatch=true]">
+ <TestCase name="testUnionQuery[isBatchMode=true]">
<Resource name="sql">
<![CDATA[SELECT sum(a) as sum_a, g FROM (SELECT a, b, c FROM table3 UNION ALL SELECT a, b, c FROM table4), table5 WHERE b = e group by g]]>
</Resource>
@@ -118,7 +118,7 @@ Calc(select=[sum_a, g], resource=[{parallelism=18}])
]]>
</Resource>
</TestCase>
- <TestCase name="testSourcePartitionMaxNum[isBatch=false]">
+ <TestCase name="testSourcePartitionMaxNum[isBatchMode=false]">
<Resource name="sql">
<![CDATA[SELECT * FROM table3]]>
</Resource>
@@ -128,7 +128,7 @@ TableSourceScan(table=[[default_catalog, default_database, table3, source: [Mock
]]>
</Resource>
</TestCase>
- <TestCase name="testUnionQuery[isBatch=false]">
+ <TestCase name="testUnionQuery[isBatchMode=false]">
<Resource name="sql">
<![CDATA[SELECT sum(a) as sum_a, g FROM (SELECT a, b, c FROM table3 UNION ALL SELECT a, b, c FROM table4), table5 WHERE b = e group by g]]>
</Resource>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/TableSourceTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/TableSourceTest.xml
index 8fa7a3f..df5adca 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/TableSourceTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/TableSourceTest.xml
@@ -16,6 +16,22 @@ See the License for the specific language governing permissions and
limitations under the License.
-->
<Root>
+ <TestCase name="testBoundedStreamTableSource">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
<TestCase name="testFilterCannotPushDown">
<Resource name="sql">
<![CDATA[SELECT * FROM FilterableTable WHERE price > 10]]>
@@ -394,4 +410,20 @@ Calc(select=[rowtime, id, name, val])
]]>
</Resource>
</TestCase>
+ <TestCase name="testUnboundedStreamTableSource">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala
index e06ccb2..b33ebfb 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala
@@ -34,9 +34,9 @@ import scala.collection.JavaConversions._
/** Test cases for catalog table. */
@RunWith(classOf[Parameterized])
-class CatalogTableITCase(isStreaming: Boolean) {
+class CatalogTableITCase(isStreamingMode: Boolean) {
- private val settings = if (isStreaming) {
+ private val settings = if (isStreamingMode) {
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
} else {
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
@@ -68,7 +68,7 @@ class CatalogTableITCase(isStreaming: Boolean) {
.getConfiguration
.setInteger(ExecutionConfigOptions.SQL_RESOURCE_DEFAULT_PARALLELISM, 1)
TestCollectionTableFactory.reset()
- TestCollectionTableFactory.isStreaming = isStreaming
+ TestCollectionTableFactory.isStreaming = isStreamingMode
}
def toRow(args: Any*):Row = {
@@ -216,7 +216,7 @@ class CatalogTableITCase(isStreaming: Boolean) {
@Test
def testInsertWithAggregateSource(): Unit = {
- if (isStreaming) {
+ if (isStreamingMode) {
return
}
val sourceData = List(
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/codegen/agg/AggTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/codegen/agg/AggTestBase.scala
index 0237a62..eac535b 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/codegen/agg/AggTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/codegen/agg/AggTestBase.scala
@@ -41,11 +41,11 @@ import org.powermock.api.mockito.PowerMockito.{mock, when}
/**
* Agg test base to mock agg information and etc.
*/
-abstract class AggTestBase(isBatch: Boolean) {
+abstract class AggTestBase(isBatchMode: Boolean) {
val typeFactory: FlinkTypeFactory = new FlinkTypeFactory(new FlinkTypeSystem())
val env = new ScalaStreamExecEnv(new LocalStreamEnvironment)
- private val tEnv = if (isBatch) {
+ private val tEnv = if (isBatchMode) {
val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
// use impl class instead of interface class to avoid
// "Static methods in interface require -target:jvm-1.8"
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/codegen/agg/AggsHandlerCodeGeneratorTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/codegen/agg/AggsHandlerCodeGeneratorTest.scala
index 2f2ef16..4561147 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/codegen/agg/AggsHandlerCodeGeneratorTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/codegen/agg/AggsHandlerCodeGeneratorTest.scala
@@ -28,7 +28,7 @@ import org.junit.{Assert, Test}
import java.lang
-class AggsHandlerCodeGeneratorTest extends AggTestBase(isBatch = false) {
+class AggsHandlerCodeGeneratorTest extends AggTestBase(isBatchMode = false) {
@Test
def testAvg(): Unit = {
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/codegen/agg/batch/BatchAggTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/codegen/agg/batch/BatchAggTestBase.scala
index edcab6b..e351d95 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/codegen/agg/batch/BatchAggTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/codegen/agg/batch/BatchAggTestBase.scala
@@ -39,7 +39,7 @@ import scala.collection.JavaConverters._
/**
* Base agg test.
*/
-abstract class BatchAggTestBase extends AggTestBase(isBatch = true) {
+abstract class BatchAggTestBase extends AggTestBase(isBatchMode = true) {
val globalOutputType = RowType.of(
Array[LogicalType](
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/TableSourceTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/TableSourceTest.scala
index 78fd39d..41b2f68 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/TableSourceTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/TableSourceTest.scala
@@ -20,8 +20,11 @@ package org.apache.flink.table.plan.batch.sql
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, LocalTimeTypeInfo, TypeInformation}
import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.table.api.{DataTypes, TableSchema, Types}
+import org.apache.flink.table.api.{DataTypes, TableSchema, Types, ValidationException}
import org.apache.flink.table.expressions.utils.Func1
+import org.apache.flink.table.sources.TableSource
+import org.apache.flink.table.types.{DataType, TypeInfoDataTypeConverter}
+import org.apache.flink.table.util._
import org.apache.flink.table.types.TypeInfoDataTypeConverter
import org.apache.flink.table.util.{TestPartitionableTableSource, _}
import org.apache.flink.types.Row
@@ -31,12 +34,12 @@ import org.junit.{Before, Test}
class TableSourceTest extends TableTestBase {
private val util = batchTestUtil()
+ private val tableSchema = TableSchema.builder().fields(
+ Array("a", "b", "c"),
+ Array(DataTypes.INT(), DataTypes.BIGINT(), DataTypes.STRING())).build()
@Before
def setup(): Unit = {
- val tableSchema = TableSchema.builder().fields(
- Array("a", "b", "c"),
- Array(DataTypes.INT(), DataTypes.BIGINT(), DataTypes.STRING())).build()
util.tableEnv.registerTableSource("ProjectableTable", new TestProjectableTableSource(
true,
tableSchema,
@@ -50,6 +53,35 @@ class TableSourceTest extends TableTestBase {
}
@Test
+ def testBoundedStreamTableSource(): Unit = {
+ util.tableEnv.registerTableSource("MyTable", new TestTableSource(true, tableSchema))
+ util.verifyPlan("SELECT * FROM MyTable")
+ }
+
+ @Test
+ def testUnboundedStreamTableSource(): Unit = {
+ util.tableEnv.registerTableSource("MyTable", new TestTableSource(false, tableSchema))
+ thrown.expect(classOf[ValidationException])
+ thrown.expectMessage("Only bounded StreamTableSource can be used in batch mode.")
+ util.verifyPlan("SELECT * FROM MyTable")
+ }
+
+ @Test
+ def testNonStreamTableSource(): Unit = {
+ val tableSource = new TableSource[Row]() {
+
+ override def getProducedDataType: DataType = tableSchema.toRowDataType
+
+ override def getTableSchema: TableSchema = tableSchema
+ }
+ util.tableEnv.registerTableSource("MyTable", tableSource)
+ thrown.expect(classOf[ValidationException])
+ thrown.expectMessage(
+ "Only StreamTableSource and LookupableTableSource can be used in Blink planner.")
+ util.verifyPlan("SELECT * FROM MyTable")
+ }
+
+ @Test
def testSimpleProject(): Unit = {
util.verifyPlan("SELECT a, c FROM ProjectableTable")
}
@@ -177,7 +209,7 @@ class TableSourceTest extends TableTestBase {
row.setField(3, DateTimeTestUtil.localDateTime("2017-01-24 12:45:01.234"))
val tableSource = TestFilterableTableSource(
- isBatch = true, rowTypeInfo, Seq(row), Set("dv", "tv", "tsv"))
+ isBounded = true, rowTypeInfo, Seq(row), Set("dv", "tv", "tsv"))
util.tableEnv.registerTableSource("FilterableTable1", tableSource)
val sqlQuery =
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/nodes/resource/ExecNodeResourceTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/nodes/resource/ExecNodeResourceTest.scala
index 6aff288..5b1d050 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/nodes/resource/ExecNodeResourceTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/nodes/resource/ExecNodeResourceTest.scala
@@ -42,21 +42,21 @@ import org.mockito.Mockito.{mock, when}
import java.util
@RunWith(classOf[Parameterized])
-class ExecNodeResourceTest(isBatch: Boolean) extends TableTestBase {
+class ExecNodeResourceTest(isBatchMode: Boolean) extends TableTestBase {
private var testUtil: TableTestUtil = _
@Before
def before(): Unit = {
- testUtil = if(isBatch) batchTestUtil() else streamTestUtil()
+ testUtil = if(isBatchMode) batchTestUtil() else streamTestUtil()
val table3Stats = new TableStats(5000000)
- val table3Source = new MockTableSource(isBatch,
+ val table3Source = new MockTableSource(isBatchMode,
new TableSchema(Array("a", "b", "c"),
Array[TypeInformation[_]](Types.INT, Types.LONG, Types.STRING)))
testUtil.addTableSource(
"table3", table3Source, FlinkStatistic.builder().tableStats(table3Stats).build())
val table5Stats = new TableStats(8000000)
- val table5Source = new MockTableSource(isBatch,
+ val table5Source = new MockTableSource(isBatchMode,
new TableSchema(Array("d", "e", "f", "g", "h"),
Array[TypeInformation[_]](Types.INT, Types.LONG, Types.INT, Types.STRING, Types.LONG)))
testUtil.addTableSource(
@@ -87,7 +87,7 @@ class ExecNodeResourceTest(isBatch: Boolean) extends TableTestBase {
@Test
def testUnionQuery(): Unit = {
val statsOfTable4 = new TableStats(100L)
- val table4Source = new MockTableSource(isBatch,
+ val table4Source = new MockTableSource(isBatchMode,
new TableSchema(Array("a", "b", "c"),
Array[TypeInformation[_]](Types.INT, Types.LONG, Types.STRING)))
testUtil.addTableSource(
@@ -162,7 +162,7 @@ class ExecNodeResourceTest(isBatch: Boolean) extends TableTestBase {
object ExecNodeResourceTest {
- @Parameterized.Parameters(name = "isBatch={0}")
+ @Parameterized.Parameters(name = "isBatchMode={0}")
def parameters(): util.Collection[Array[Any]] = {
util.Arrays.asList(
Array(true),
@@ -180,11 +180,9 @@ object ExecNodeResourceTest {
/**
* Batch/Stream [[org.apache.flink.table.sources.TableSource]] for resource testing.
*/
-class MockTableSource(isBatch: Boolean, schema: TableSchema)
+class MockTableSource(val isBounded: Boolean, schema: TableSchema)
extends StreamTableSource[BaseRow] {
- override def isBounded: Boolean = isBatch
-
override def getDataStream(
execEnv: environment.StreamExecutionEnvironment): DataStream[BaseRow] = {
val transformation = mock(classOf[SourceTransformation[BaseRow]])
@@ -193,7 +191,7 @@ class MockTableSource(isBatch: Boolean, schema: TableSchema)
when(bs.getTransformation).thenReturn(transformation)
when(transformation.getOutputType).thenReturn(getReturnType)
val factory = mock(classOf[StreamOperatorFactory[BaseRow]])
- when(factory.isStreamSource).thenReturn(!isBatch)
+ when(factory.isStreamSource).thenReturn(!isBounded)
when(transformation.getOperatorFactory).thenReturn(factory)
bs
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/TableSourceTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/TableSourceTest.scala
index 08b72ec..67724d8 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/TableSourceTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/TableSourceTest.scala
@@ -20,8 +20,10 @@ package org.apache.flink.table.plan.stream.sql
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, TypeInformation}
import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.table.api.{TableSchema, Types}
+import org.apache.flink.table.api.{DataTypes, TableSchema, Types, ValidationException}
import org.apache.flink.table.expressions.utils.Func1
+import org.apache.flink.table.sources.TableSource
+import org.apache.flink.table.types.DataType
import org.apache.flink.table.util._
import org.apache.flink.types.Row
@@ -31,6 +33,10 @@ class TableSourceTest extends TableTestBase {
private val util = streamTestUtil()
+ private val tableSchema = TableSchema.builder().fields(
+ Array("a", "b", "c"),
+ Array(DataTypes.INT(), DataTypes.BIGINT(), DataTypes.STRING())).build()
+
@Before
def setup(): Unit = {
util.tableEnv.registerTableSource("FilterableTable", TestFilterableTableSource(false))
@@ -38,6 +44,33 @@ class TableSourceTest extends TableTestBase {
}
@Test
+ def testBoundedStreamTableSource(): Unit = {
+ util.tableEnv.registerTableSource("MyTable", new TestTableSource(true, tableSchema))
+ util.verifyPlan("SELECT * FROM MyTable")
+ }
+
+ @Test
+ def testUnboundedStreamTableSource(): Unit = {
+ util.tableEnv.registerTableSource("MyTable", new TestTableSource(false, tableSchema))
+ util.verifyPlan("SELECT * FROM MyTable")
+ }
+
+ @Test
+ def testNonStreamTableSource(): Unit = {
+ val tableSource = new TableSource[Row]() {
+
+ override def getProducedDataType: DataType = tableSchema.toRowDataType
+
+ override def getTableSchema: TableSchema = tableSchema
+ }
+ util.tableEnv.registerTableSource("MyTable", tableSource)
+ thrown.expect(classOf[ValidationException])
+ thrown.expectMessage(
+ "Only StreamTableSource and LookupableTableSource can be used in Blink planner.")
+ util.verifyPlan("SELECT * FROM MyTable")
+ }
+
+ @Test
def testTableSourceWithLongRowTimeField(): Unit = {
val tableSchema = new TableSchema(
Array("id", "rowtime", "val", "name"),
@@ -347,7 +380,7 @@ class TableSourceTest extends TableTestBase {
row.setField(3, DateTimeTestUtil.localDateTime("2017-01-24 12:45:01.234"))
val tableSource = TestFilterableTableSource(
- isBatch = false, rowTypeInfo, Seq(row), Set("dv", "tv", "tsv"))
+ isBounded = false, rowTypeInfo, Seq(row), Set("dv", "tv", "tsv"))
util.tableEnv.registerTableSource("FilterableTable1", tableSource)
val sqlQuery =
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
index 38b5d12..7a26fb6 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
@@ -105,13 +105,13 @@ abstract class TableTestBase {
}
}
-abstract class TableTestUtilBase(test: TableTestBase, isBatch: Boolean) {
+abstract class TableTestUtilBase(test: TableTestBase, isStreamingMode: Boolean) {
protected lazy val diffRepository: DiffRepository = DiffRepository.lookup(test.getClass)
- protected val setting: EnvironmentSettings = if (isBatch) {
- EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
- } else {
+ protected val setting: EnvironmentSettings = if (isStreamingMode) {
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
+ } else {
+ EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
}
// a counter for unique table names
@@ -124,6 +124,8 @@ abstract class TableTestUtilBase(test: TableTestBase, isBatch: Boolean) {
protected def getTableEnv: TableEnvironment
+ protected def isBounded: Boolean = !isStreamingMode
+
def getPlanner: PlannerBase = {
getTableEnv.asInstanceOf[TableEnvironmentImpl].getPlanner.asInstanceOf[PlannerBase]
}
@@ -190,7 +192,7 @@ abstract class TableTestUtilBase(test: TableTestBase, isBatch: Boolean) {
FieldInfoUtils.getFieldsInfo(typeInfo, fields.toArray).toTableSchema
}
- addTableSource(name, new TestTableSource(isBatch, tableSchema))
+ addTableSource(name, new TestTableSource(isBounded, tableSchema))
}
/**
@@ -207,7 +209,7 @@ abstract class TableTestUtilBase(test: TableTestBase, isBatch: Boolean) {
types: Array[TypeInformation[_]],
fields: Array[String]): Table = {
val schema = new TableSchema(fields, types)
- val tableSource = new TestTableSource(isBatch, schema)
+ val tableSource = new TestTableSource(isBounded, schema)
addTableSource(name, tableSource)
}
@@ -478,9 +480,10 @@ abstract class TableTestUtilBase(test: TableTestBase, isBatch: Boolean) {
abstract class TableTestUtil(
test: TableTestBase,
- isBatch: Boolean,
+ // determines if the table environment should work in a batch or streaming mode
+ isStreamingMode: Boolean,
catalogManager: Option[CatalogManager] = None)
- extends TableTestUtilBase(test, isBatch) {
+ extends TableTestUtilBase(test, isStreamingMode) {
protected val testingTableEnv: TestingTableEnvironment =
TestingTableEnvironment.create(setting, catalogManager)
val tableEnv: TableEnvironment = testingTableEnv
@@ -510,7 +513,7 @@ abstract class TableTestUtil(
fields: Array[String],
statistic: FlinkStatistic = FlinkStatistic.UNKNOWN): Table = {
val schema = new TableSchema(fields, types)
- val tableSource = new TestTableSource(isBatch, schema)
+ val tableSource = new TestTableSource(isBounded, schema)
addTableSource(name, tableSource, statistic)
}
@@ -529,7 +532,7 @@ abstract class TableTestUtil(
// TODO RichTableSourceQueryOperation should be deleted and use registerTableSource method
// instead of registerTable method here after unique key in TableSchema is ready
// and setting catalog statistic to TableSourceTable in DatabaseCalciteSchema is ready
- val operation = new RichTableSourceQueryOperation(tableSource, isBatch, statistic)
+ val operation = new RichTableSourceQueryOperation(tableSource, statistic)
val table = testingTableEnv.createTable(operation)
testingTableEnv.registerTable(name, table)
testingTableEnv.scan(name)
@@ -593,8 +596,8 @@ abstract class TableTestUtil(
abstract class ScalaTableTestUtil(
test: TableTestBase,
- isBatch: Boolean)
- extends TableTestUtilBase(test, isBatch) {
+ isStreamingMode: Boolean)
+ extends TableTestUtilBase(test, isStreamingMode) {
// scala env
val env = new ScalaStreamExecEnv(new LocalStreamEnvironment())
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
@@ -620,8 +623,8 @@ abstract class ScalaTableTestUtil(
abstract class JavaTableTestUtil(
test: TableTestBase,
- isBatch: Boolean)
- extends TableTestUtilBase(test, isBatch) {
+ isStreamingMode: Boolean)
+ extends TableTestUtilBase(test, isStreamingMode) {
// java env
val env = new LocalStreamEnvironment()
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
@@ -653,7 +656,7 @@ abstract class JavaTableTestUtil(
case class StreamTableTestUtil(
test: TableTestBase,
catalogManager: Option[CatalogManager] = None)
- extends TableTestUtil(test, false, catalogManager) {
+ extends TableTestUtil(test, isStreamingMode = true, catalogManager) {
/**
* Register a table with specific row time field and offset.
@@ -776,14 +779,13 @@ case class StreamTableTestUtil(
/**
* Utility for stream scala table test.
*/
-case class ScalaStreamTableTestUtil(test: TableTestBase) extends ScalaTableTestUtil(test, false) {
+case class ScalaStreamTableTestUtil(test: TableTestBase) extends ScalaTableTestUtil(test, true) {
}
/**
* Utility for stream java table test.
*/
-case class JavaStreamTableTestUtil(test: TableTestBase) extends JavaTableTestUtil(test, false) {
-
+case class JavaStreamTableTestUtil(test: TableTestBase) extends JavaTableTestUtil(test, true) {
}
/**
@@ -792,7 +794,7 @@ case class JavaStreamTableTestUtil(test: TableTestBase) extends JavaTableTestUti
case class BatchTableTestUtil(
test: TableTestBase,
catalogManager: Option[CatalogManager] = None)
- extends TableTestUtil(test, true, catalogManager) {
+ extends TableTestUtil(test, isStreamingMode = false, catalogManager) {
def buildBatchProgram(firstProgramNameToRemove: String): Unit = {
val program = FlinkBatchProgram.buildProgram(tableEnv.getConfig.getConfiguration)
@@ -835,25 +837,22 @@ case class BatchTableTestUtil(
/**
* Utility for batch scala table test.
*/
-case class ScalaBatchTableTestUtil(test: TableTestBase) extends ScalaTableTestUtil(test, true) {
+case class ScalaBatchTableTestUtil(test: TableTestBase) extends ScalaTableTestUtil(test, false) {
}
/**
* Utility for batch java table test.
*/
-case class JavaBatchTableTestUtil(test: TableTestBase) extends JavaTableTestUtil(test, true) {
+case class JavaBatchTableTestUtil(test: TableTestBase) extends JavaTableTestUtil(test, false) {
}
/**
* Batch/Stream [[org.apache.flink.table.sources.TableSource]] for testing.
*/
-class TestTableSource(isBatch: Boolean, schema: TableSchema)
+class TestTableSource(val isBounded: Boolean, schema: TableSchema)
extends StreamTableSource[Row] {
- override def isBounded: Boolean = isBatch
-
- override def getDataStream(
- execEnv: environment.StreamExecutionEnvironment): DataStream[Row] = {
+ override def getDataStream(execEnv: environment.StreamExecutionEnvironment): DataStream[Row] = {
execEnv.fromCollection(List[Row](), getReturnType)
}
@@ -871,14 +870,14 @@ class TestingTableEnvironment private(
executor: Executor,
functionCatalog: FunctionCatalog,
planner: PlannerBase,
- isStreaming: Boolean)
+ isStreamingMode: Boolean)
extends TableEnvironmentImpl(
catalogManager,
tableConfig,
executor,
functionCatalog,
planner,
- isStreaming) {
+ isStreamingMode) {
private val bufferedOperations: util.List[ModifyOperation] = new util.ArrayList[ModifyOperation]
@@ -1002,7 +1001,7 @@ object TestingTableEnvironment {
.create(plannerProperties, executor, tableConfig, functionCatalog, catalogMgr)
.asInstanceOf[PlannerBase]
new TestingTableEnvironment(
- catalogMgr, tableConfig, executor, functionCatalog, planner, !settings.isBatchMode)
+ catalogMgr, tableConfig, executor, functionCatalog, planner, settings.isStreamingMode)
}
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala
index b7c4f47..f51b1e2 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala
@@ -132,7 +132,7 @@ object TestTableSources {
}
class TestTableSourceWithTime[T](
- isBatch: Boolean,
+ val isBounded: Boolean,
tableSchema: TableSchema,
returnType: TypeInformation[T],
values: Seq[T],
@@ -144,8 +144,6 @@ class TestTableSourceWithTime[T](
with DefinedProctimeAttribute
with DefinedFieldMapping {
- override def isBounded: Boolean = isBatch
-
override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[T] = {
val dataStream = execEnv.fromCollection(values, returnType)
dataStream.getTransformation.setMaxParallelism(1)
@@ -206,7 +204,7 @@ class TestPreserveWMTableSource[T](
}
class TestProjectableTableSource(
- isBatch: Boolean,
+ isBounded: Boolean,
tableSchema: TableSchema,
returnType: TypeInformation[Row],
values: Seq[Row],
@@ -214,7 +212,7 @@ class TestProjectableTableSource(
proctime: String = null,
fieldMapping: Map[String, String] = null)
extends TestTableSourceWithTime[Row](
- isBatch,
+ isBounded,
tableSchema,
returnType,
values,
@@ -256,7 +254,7 @@ class TestProjectableTableSource(
}
new TestProjectableTableSource(
- isBatch,
+ isBounded,
newTableSchema,
projectedReturnType,
projectedValues,
@@ -272,14 +270,14 @@ class TestProjectableTableSource(
}
class TestNestedProjectableTableSource(
- isBatch: Boolean,
+ isBounded: Boolean,
tableSchema: TableSchema,
returnType: TypeInformation[Row],
values: Seq[Row],
rowtime: String = null,
proctime: String = null)
extends TestTableSourceWithTime[Row](
- isBatch,
+ isBounded,
tableSchema,
returnType,
values,
@@ -316,7 +314,7 @@ class TestNestedProjectableTableSource(
}
val copy = new TestNestedProjectableTableSource(
- isBatch,
+ isBounded,
newTableSchema,
projectedReturnType,
projectedValues,
@@ -336,7 +334,7 @@ class TestNestedProjectableTableSource(
* A data source that implements some very basic filtering in-memory in order to test
* expression push-down logic.
*
- * @param isBatch whether this is a bounded source
+ * @param isBounded whether this is a bounded source
* @param rowTypeInfo The type info for the rows.
* @param data The data that filtering is applied to in order to get the final dataset.
* @param filterableFields The fields that are allowed to be filtered.
@@ -344,7 +342,7 @@ class TestNestedProjectableTableSource(
* @param filterPushedDown Whether predicates have been pushed down yet.
*/
class TestFilterableTableSource(
- isBatch: Boolean,
+ val isBounded: Boolean,
rowTypeInfo: RowTypeInfo,
data: Seq[Row],
filterableFields: Set[String] = Set(),
@@ -357,8 +355,6 @@ class TestFilterableTableSource(
val fieldTypes: Array[TypeInformation[_]] = rowTypeInfo.getFieldTypes
- override def isBounded: Boolean = isBatch
-
override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
execEnv.fromCollection[Row](applyPredicatesToRows(data).asJava, getReturnType)
.setParallelism(1).setMaxParallelism(1)
@@ -386,7 +382,7 @@ class TestFilterableTableSource(
}
new TestFilterableTableSource(
- isBatch,
+ isBounded,
rowTypeInfo,
data,
filterableFields,
@@ -496,14 +492,14 @@ object TestFilterableTableSource {
/**
* @return The default filterable table source.
*/
- def apply(isBatch: Boolean): TestFilterableTableSource = {
- apply(isBatch, defaultTypeInfo, defaultRows, defaultFilterableFields)
+ def apply(isBounded: Boolean): TestFilterableTableSource = {
+ apply(isBounded, defaultTypeInfo, defaultRows, defaultFilterableFields)
}
/**
* A filterable data source with custom data.
*
- * @param isBatch whether this is a bounded source
+ * @param isBounded whether this is a bounded source
* @param rowTypeInfo The type of the data. Its expected that both types and field
* names are provided.
* @param rows The data as a sequence of rows.
@@ -511,11 +507,11 @@ object TestFilterableTableSource {
* @return The table source.
*/
def apply(
- isBatch: Boolean,
+ isBounded: Boolean,
rowTypeInfo: RowTypeInfo,
rows: Seq[Row],
filterableFields: Set[String]): TestFilterableTableSource = {
- new TestFilterableTableSource(isBatch, rowTypeInfo, rows, filterableFields)
+ new TestFilterableTableSource(isBounded, rowTypeInfo, rows, filterableFields)
}
private lazy val defaultFilterableFields = Set("amount")
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogCalciteSchema.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogCalciteSchema.java
index 5025adf..9b50b91 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogCalciteSchema.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogCalciteSchema.java
@@ -41,12 +41,12 @@ import java.util.Set;
@Internal
public class CatalogCalciteSchema implements Schema {
- private final boolean isBatch;
+ private final boolean isStreamingMode;
private final String catalogName;
private final Catalog catalog;
- public CatalogCalciteSchema(boolean isBatch, String catalogName, Catalog catalog) {
- this.isBatch = isBatch;
+ public CatalogCalciteSchema(boolean isStreamingMode, String catalogName, Catalog catalog) {
+ this.isStreamingMode = isStreamingMode;
this.catalogName = catalogName;
this.catalog = catalog;
}
@@ -61,7 +61,7 @@ public class CatalogCalciteSchema implements Schema {
public Schema getSubSchema(String schemaName) {
if (catalog.databaseExists(schemaName)) {
- return new DatabaseCalciteSchema(isBatch, schemaName, catalogName, catalog);
+ return new DatabaseCalciteSchema(isStreamingMode, schemaName, catalogName, catalog);
} else {
return null;
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogManagerCalciteSchema.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogManagerCalciteSchema.java
index ceef249..c017618 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogManagerCalciteSchema.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogManagerCalciteSchema.java
@@ -49,11 +49,11 @@ import java.util.stream.Stream;
public class CatalogManagerCalciteSchema implements Schema {
private final CatalogManager catalogManager;
- private boolean isBatch;
+ private boolean isStreamingMode;
- public CatalogManagerCalciteSchema(CatalogManager catalogManager, boolean isBatch) {
+ public CatalogManagerCalciteSchema(CatalogManager catalogManager, boolean isStreamingMode) {
this.catalogManager = catalogManager;
- this.isBatch = isBatch;
+ this.isStreamingMode = isStreamingMode;
}
@Override
@@ -89,11 +89,11 @@ public class CatalogManagerCalciteSchema implements Schema {
@Override
public Schema getSubSchema(String name) {
Optional<Schema> externalSchema = catalogManager.getExternalCatalog(name)
- .map(externalCatalog -> new ExternalCatalogSchema(isBatch, name, externalCatalog));
+ .map(externalCatalog -> new ExternalCatalogSchema(isStreamingMode, name, externalCatalog));
return externalSchema.orElseGet(() ->
catalogManager.getCatalog(name)
- .map(catalog -> new CatalogCalciteSchema(isBatch, name, catalog))
+ .map(catalog -> new CatalogCalciteSchema(isStreamingMode, name, catalog))
.orElse(null)
);
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
index ac7cdb8..46af332 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
@@ -51,17 +51,17 @@ import static java.lang.String.format;
* Tables are registered as tables in the schema.
*/
class DatabaseCalciteSchema implements Schema {
- private final boolean isBatch;
+ private final boolean isStreamingMode;
private final String databaseName;
private final String catalogName;
private final Catalog catalog;
public DatabaseCalciteSchema(
- boolean isBatch,
+ boolean isStreamingMode,
String databaseName,
String catalogName,
Catalog catalog) {
- this.isBatch = isBatch;
+ this.isStreamingMode = isStreamingMode;
this.databaseName = databaseName;
this.catalogName = catalogName;
this.catalog = catalog;
@@ -132,7 +132,7 @@ class DatabaseCalciteSchema implements Schema {
// this means the TableSource extends from StreamTableSource, this is needed for the
// legacy Planner. Blink Planner should use the information that comes from the TableSource
// itself to determine if it is a streaming or batch source.
- !isBatch,
+ isStreamingMode,
FlinkStatistic.UNKNOWN()
);
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/executor/StreamExecutorFactory.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/executor/StreamExecutorFactory.java
index dff5590..cf9f650 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/executor/StreamExecutorFactory.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/executor/StreamExecutorFactory.java
@@ -60,7 +60,7 @@ public class StreamExecutorFactory implements ExecutorFactory {
@Override
public Map<String, String> requiredContext() {
DescriptorProperties properties = new DescriptorProperties();
- properties.putBoolean(EnvironmentSettings.BATCH_MODE, false);
+ properties.putBoolean(EnvironmentSettings.STREAMING_MODE, true);
return properties.asMap();
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/StreamPlannerFactory.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/StreamPlannerFactory.java
index 4efb850..62e19bd 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/StreamPlannerFactory.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/StreamPlannerFactory.java
@@ -59,7 +59,7 @@ public final class StreamPlannerFactory implements PlannerFactory {
public Map<String, String> requiredContext() {
DescriptorProperties properties = new DescriptorProperties();
- properties.putBoolean(EnvironmentSettings.BATCH_MODE, false);
+ properties.putBoolean(EnvironmentSettings.STREAMING_MODE, true);
return properties.asMap();
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index 3452af6..06a04b7 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -85,22 +85,24 @@ abstract class TableEnvImpl(
private[flink] val operationTreeBuilder = OperationTreeBuilder.create(
functionCatalog,
tableLookup,
- !isBatch)
+ isStreamingMode)
protected val planningConfigurationBuilder: PlanningConfigurationBuilder =
new PlanningConfigurationBuilder(
config,
functionCatalog,
- asRootSchema(new CatalogManagerCalciteSchema(catalogManager, isBatch)),
+ asRootSchema(new CatalogManagerCalciteSchema(catalogManager, isStreamingMode)),
expressionBridge)
def getConfig: TableConfig = config
- private def isBatch: Boolean = this match {
- case _: BatchTableEnvImpl => true
- case _ => false
+ private def isStreamingMode: Boolean = this match {
+ case _: BatchTableEnvImpl => false
+ case _ => true
}
+ private def isBatchTable: Boolean = !isStreamingMode
+
override def registerExternalCatalog(name: String, externalCatalog: ExternalCatalog): Unit = {
catalogManager.registerExternalCatalog(name, externalCatalog)
}
@@ -237,7 +239,7 @@ abstract class TableEnvImpl(
}
override def fromTableSource(source: TableSource[_]): Table = {
- createTable(new TableSourceQueryOperation(source, isBatch))
+ createTable(new TableSourceQueryOperation(source, isBatchTable))
}
/**
@@ -276,12 +278,12 @@ abstract class TableEnvImpl(
replaceTableInternal(
name,
ConnectorCatalogTable
- .sourceAndSink(tableSource, table.getTableSink.get, isBatch))
+ .sourceAndSink(tableSource, table.getTableSink.get, isBatchTable))
}
// no table is registered
case _ =>
- registerTableInternal(name, ConnectorCatalogTable.source(tableSource, isBatch))
+ registerTableInternal(name, ConnectorCatalogTable.source(tableSource, isBatchTable))
}
}
@@ -303,12 +305,12 @@ abstract class TableEnvImpl(
replaceTableInternal(
name,
ConnectorCatalogTable
- .sourceAndSink(table.getTableSource.get, tableSink, isBatch))
+ .sourceAndSink(table.getTableSource.get, tableSink, isBatchTable))
}
// no table is registered
case _ =>
- registerTableInternal(name, ConnectorCatalogTable.sink(tableSink, isBatch))
+ registerTableInternal(name, ConnectorCatalogTable.sink(tableSink, isBatchTable))
}
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
index 4b2287f..88c2620 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
@@ -42,7 +42,7 @@ import scala.collection.JavaConverters._
*/
@deprecated
class ExternalCatalogSchema(
- isBatch: Boolean,
+ isStreamingMode: Boolean,
catalogIdentifier: String,
catalog: ExternalCatalog) extends Schema with Logging {
@@ -56,7 +56,7 @@ class ExternalCatalogSchema(
override def getSubSchema(name: String): Schema = {
try {
val db = catalog.getSubCatalog(name)
- new ExternalCatalogSchema(isBatch, name, db)
+ new ExternalCatalogSchema(isStreamingMode, name, db)
} catch {
case _: CatalogNotExistException =>
LOG.warn(s"Sub-catalog $name does not exist in externalCatalog $catalogIdentifier")
@@ -81,7 +81,7 @@ class ExternalCatalogSchema(
*/
override def getTable(name: String): Table = try {
val externalCatalogTable = catalog.getTable(name)
- ExternalTableUtil.fromExternalCatalogTable(isBatch, externalCatalogTable).orNull
+ ExternalTableUtil.fromExternalCatalogTable(isStreamingMode, externalCatalogTable).orNull
} catch {
case _: TableNotExistException => {
LOG.warn(s"Table $name does not exist in externalCatalog $catalogIdentifier")
@@ -126,11 +126,12 @@ object ExternalCatalogSchema {
* @param externalCatalog The external catalog to register
*/
def registerCatalog(
- isBatch: Boolean,
+ isStreamingMode: Boolean,
parentSchema: SchemaPlus,
externalCatalogIdentifier: String,
externalCatalog: ExternalCatalog): Unit = {
- val newSchema = new ExternalCatalogSchema(isBatch, externalCatalogIdentifier, externalCatalog)
+ val newSchema = new ExternalCatalogSchema(
+ isStreamingMode, externalCatalogIdentifier, externalCatalog)
val schemaPlusOfNewSchema = parentSchema.add(externalCatalogIdentifier, newSchema)
newSchema.registerSubSchemas(schemaPlusOfNewSchema)
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala
index 4ac24dd..b55ae2e 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala
@@ -39,37 +39,37 @@ object ExternalTableUtil extends Logging {
* @param externalTable the [[ExternalCatalogTable]] instance which to convert
* @return converted [[TableSourceTable]] instance from the input catalog table
*/
- def fromExternalCatalogTable[T](isBatch: Boolean, externalTable: ExternalCatalogTable)
+ def fromExternalCatalogTable[T](isStreamingMode: Boolean, externalTable: ExternalCatalogTable)
: Option[TableSourceTable[T]] = {
val statistics = new FlinkStatistic(toScala(externalTable.getTableStats))
if (externalTable.isTableSource) {
- Some(createTableSource(isBatch, externalTable, statistics))
+ Some(createTableSource(isStreamingMode, externalTable, statistics))
} else {
None
}
}
private def createTableSource[T](
- isBatch: Boolean,
+ isStreamingMode: Boolean,
externalTable: ExternalCatalogTable,
statistics: FlinkStatistic)
: TableSourceTable[T] = {
- val source = if (isModeCompatibleWithTable(isBatch, externalTable)) {
+ val source = if (isModeCompatibleWithTable(isStreamingMode, externalTable)) {
TableFactoryUtil.findAndCreateTableSource(externalTable)
} else {
throw new ValidationException(
"External catalog table does not support the current environment for a table source.")
}
- new TableSourceTable[T](source.asInstanceOf[TableSource[T]], !isBatch, statistics)
+ new TableSourceTable[T](source.asInstanceOf[TableSource[T]], isStreamingMode, statistics)
}
private def isModeCompatibleWithTable[T](
- isBatch: Boolean,
+ isStreamingMode: Boolean,
externalTable: ExternalCatalogTable)
: Boolean = {
- isBatch && externalTable.isBatchTable || !isBatch && externalTable.isStreamTable
+ !isStreamingMode && externalTable.isBatchTable || isStreamingMode && externalTable.isStreamTable
}
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
index 89aff89..fb0a09c 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
@@ -50,7 +50,7 @@ class FlinkLogicalTableSourceScan(
override def deriveRowType(): RelDataType = {
val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
- val streamingTable = table.unwrap(classOf[TableSourceTable[_]]).isStreaming
+ val streamingTable = table.unwrap(classOf[TableSourceTable[_]]).isStreamingMode
TableSourceUtil.getRelDataType(tableSource, selectedFields, streamingTable, flinkTypeFactory)
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/dataSet/BatchTableSourceScanRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/dataSet/BatchTableSourceScanRule.scala
index c51c99a..e45ad85 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/dataSet/BatchTableSourceScanRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/dataSet/BatchTableSourceScanRule.scala
@@ -40,7 +40,7 @@ class BatchTableSourceScanRule
val scan: TableScan = call.rel(0).asInstanceOf[TableScan]
val sourceTable = scan.getTable.unwrap(classOf[TableSourceTable[_]])
- sourceTable != null && !sourceTable.isStreaming
+ sourceTable != null && !sourceTable.isStreamingMode
}
def convert(rel: RelNode): RelNode = {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala
index a23830c..1ddc1c3 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala
@@ -41,7 +41,7 @@ class StreamTableSourceScanRule
val scan: TableScan = call.rel(0).asInstanceOf[TableScan]
val sourceTable = scan.getTable.unwrap(classOf[TableSourceTable[_]])
- sourceTable != null && sourceTable.isStreaming
+ sourceTable != null && sourceTable.isStreamingMode
}
def convert(rel: RelNode): RelNode = {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
index daeda7b..f14fe03 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
@@ -28,10 +28,14 @@ import org.apache.flink.table.sources.{TableSource, TableSourceUtil, TableSource
/**
* Abstract class which define the interfaces required to convert a [[TableSource]] to
* a Calcite Table.
+ *
+ * @param tableSource The [[TableSource]] for which is converted to a Calcite Table.
+ * @param isStreamingMode A flag that tells if the current table is in stream mode.
+ * @param statistic The table statistics.
*/
class TableSourceTable[T](
val tableSource: TableSource[T],
- val isStreaming: Boolean,
+ val isStreamingMode: Boolean,
val statistic: FlinkStatistic)
extends AbstractTable {
@@ -48,7 +52,7 @@ class TableSourceTable[T](
TableSourceUtil.getRelDataType(
tableSource,
None,
- isStreaming,
+ isStreamingMode,
typeFactory.asInstanceOf[FlinkTypeFactory])
}
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
index de163a3..057e70a 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
@@ -75,7 +75,7 @@ class StreamPlanner(
functionCatalog.setPlannerTypeInferenceUtil(PlannerTypeInferenceUtilImpl.INSTANCE)
private val internalSchema: CalciteSchema =
- asRootSchema(new CatalogManagerCalciteSchema(catalogManager, false))
+ asRootSchema(new CatalogManagerCalciteSchema(catalogManager, true))
// temporary bridge between API and planner
private val expressionBridge: ExpressionBridge[PlannerExpression] =
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/catalog/DatabaseCalciteSchemaTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/catalog/DatabaseCalciteSchemaTest.java
index 312b5de..ea447bb 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/catalog/DatabaseCalciteSchemaTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/catalog/DatabaseCalciteSchemaTest.java
@@ -48,7 +48,7 @@ public class DatabaseCalciteSchemaTest {
@Test
public void testCatalogTable() throws TableAlreadyExistException, DatabaseNotExistException {
GenericInMemoryCatalog catalog = new GenericInMemoryCatalog(catalogName, databaseName);
- DatabaseCalciteSchema calciteSchema = new DatabaseCalciteSchema(false,
+ DatabaseCalciteSchema calciteSchema = new DatabaseCalciteSchema(true,
databaseName,
catalogName,
catalog);
@@ -59,7 +59,7 @@ public class DatabaseCalciteSchemaTest {
assertThat(table, instanceOf(TableSourceTable.class));
TableSourceTable tableSourceTable = (TableSourceTable) table;
assertThat(tableSourceTable.tableSource(), instanceOf(TestExternalTableSource.class));
- assertThat(tableSourceTable.isStreaming(), is(true));
+ assertThat(tableSourceTable.isStreamingMode(), is(true));
}
private static final class TestCatalogBaseTable extends CatalogTableImpl {
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala
index c49cd9a..74d203b 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala
@@ -48,7 +48,7 @@ class ExternalCatalogSchemaTest extends TableTestBase {
def setUp(): Unit = {
val rootSchemaPlus: SchemaPlus = CalciteSchema.createRootSchema(true, false).plus()
val catalog = CommonTestData.getInMemoryTestCatalog(isStreaming = true)
- ExternalCatalogSchema.registerCatalog(false, rootSchemaPlus, schemaName, catalog)
+ ExternalCatalogSchema.registerCatalog(true, rootSchemaPlus, schemaName, catalog)
externalCatalogSchema = rootSchemaPlus.getSubSchema("schemaName")
val typeFactory = new FlinkTypeFactory(new FlinkTypeSystem())
val prop = new Properties()