You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/01/04 10:51:52 UTC

[flink] branch master updated: [FLINK-11234] [table] Fix ExternalTableCatalogBuilder unable to build a batch-only table

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 04035ce  [FLINK-11234] [table] Fix ExternalTableCatalogBuilder unable to build a batch-only table
04035ce is described below

commit 04035ce8b7927136c7f1e43f0514a055ce45e56a
Author: EronWright <er...@gmail.com>
AuthorDate: Sun Dec 30 19:59:47 2018 -0800

    [FLINK-11234] [table] Fix ExternalTableCatalogBuilder unable to build a batch-only table
    
    - fix the logic in supportsBatch to properly declare a batch-only table
    - adjust CommonTestData to provide batch-only or streaming-only tables
    
    This closes #7386.
---
 .../apache/flink/table/catalog/ExternalCatalogTable.scala    |  4 ++--
 .../apache/flink/table/runtime/utils/CommonTestData.scala    | 12 +++++++++---
 2 files changed, 11 insertions(+), 5 deletions(-)

diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
index 45414ee..ce57070 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
@@ -270,8 +270,8 @@ class ExternalCatalogTableBuilder(private val connectorDescriptor: ConnectorDesc
     * Explicitly declares this external table for supporting only batch environments.
     */
   def supportsBatch(): ExternalCatalogTableBuilder = {
-    isBatch = false
-    isStreaming = true
+    isBatch = true
+    isStreaming = false
     this
   }
 
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
index 64fcc8a..1209595 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
@@ -85,7 +85,9 @@ object CommonTestData {
       .withSchema(schemaDesc1)
 
     if (isStreaming) {
-      externalTableBuilder1.inAppendMode()
+      externalTableBuilder1.supportsStreaming().inAppendMode()
+    } else {
+      externalTableBuilder1.supportsBatch()
     }
 
     val csvRecord2 = Seq(
@@ -126,7 +128,9 @@ object CommonTestData {
       .withSchema(schemaDesc2)
 
     if (isStreaming) {
-      externalTableBuilder2.inAppendMode()
+      externalTableBuilder2.supportsStreaming().inAppendMode()
+    } else {
+      externalTableBuilder2.supportsBatch()
     }
 
     val tempFilePath3 = writeToTempFile("", "csv-test3", "tmp")
@@ -145,7 +149,9 @@ object CommonTestData {
       .withSchema(schemaDesc3)
 
     if (isStreaming) {
-      externalTableBuilder3.inAppendMode()
+      externalTableBuilder3.supportsStreaming().inAppendMode()
+    } else {
+      externalTableBuilder3.supportsBatch()
     }
 
     val catalog = new InMemoryExternalCatalog("test")