You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/01/04 11:02:00 UTC
[flink] branch release-1.6 updated: [FLINK-11234] [table] Fix
ExternalTableCatalogBuilder unable to build a batch-only table
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.6 by this push:
new eeb1814 [FLINK-11234] [table] Fix ExternalTableCatalogBuilder unable to build a batch-only table
eeb1814 is described below
commit eeb1814036243119a60f8133a8591f5b7da0e8f8
Author: EronWright <er...@gmail.com>
AuthorDate: Sun Dec 30 19:59:47 2018 -0800
[FLINK-11234] [table] Fix ExternalTableCatalogBuilder unable to build a batch-only table
- fix the logic in supportsBatch to properly declare a batch-only table
- adjust CommonTestData to provide batch-only or streaming-only tables
This closes #7386.
---
.../apache/flink/table/catalog/ExternalCatalogTable.scala | 4 ++--
.../apache/flink/table/runtime/utils/CommonTestData.scala | 12 +++++++++---
2 files changed, 11 insertions(+), 5 deletions(-)
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
index 9576f34..91f27e3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
@@ -268,8 +268,8 @@ class ExternalCatalogTableBuilder(private val connectorDescriptor: ConnectorDesc
* Explicitly declares this external table for supporting only batch environments.
*/
def supportsBatch(): ExternalCatalogTableBuilder = {
- isBatch = false
- isStreaming = true
+ isBatch = true
+ isStreaming = false
this
}
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
index 64fcc8a..1209595 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
@@ -85,7 +85,9 @@ object CommonTestData {
.withSchema(schemaDesc1)
if (isStreaming) {
- externalTableBuilder1.inAppendMode()
+ externalTableBuilder1.supportsStreaming().inAppendMode()
+ } else {
+ externalTableBuilder1.supportsBatch()
}
val csvRecord2 = Seq(
@@ -126,7 +128,9 @@ object CommonTestData {
.withSchema(schemaDesc2)
if (isStreaming) {
- externalTableBuilder2.inAppendMode()
+ externalTableBuilder2.supportsStreaming().inAppendMode()
+ } else {
+ externalTableBuilder2.supportsBatch()
}
val tempFilePath3 = writeToTempFile("", "csv-test3", "tmp")
@@ -145,7 +149,9 @@ object CommonTestData {
.withSchema(schemaDesc3)
if (isStreaming) {
- externalTableBuilder3.inAppendMode()
+ externalTableBuilder3.supportsStreaming().inAppendMode()
+ } else {
+ externalTableBuilder3.supportsBatch()
}
val catalog = new InMemoryExternalCatalog("test")