You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/09/09 07:21:02 UTC

[flink-table-store] branch release-0.2 updated: [FLINK-29226] Throw exception for streaming insert overwrite

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.2
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/release-0.2 by this push:
     new 0cb007a1 [FLINK-29226] Throw exception for streaming insert overwrite
0cb007a1 is described below

commit 0cb007a15a890ee16a1ac8db90314f0a859bd04e
Author: Nicholas Jiang <pr...@163.com>
AuthorDate: Fri Sep 9 15:07:28 2022 +0800

    [FLINK-29226] Throw exception for streaming insert overwrite
    
    This closes #288
---
 .../table/store/connector/sink/TableStoreSink.java |  5 +++++
 .../store/connector/ReadWriteTableITCase.java      | 26 ++++++++++++++++++++++
 2 files changed, 31 insertions(+)

diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
index bb26ea36..484fa82f 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
@@ -104,6 +104,11 @@ public class TableStoreSink implements DynamicTableSink, SupportsOverwrite, Supp
 
     @Override
     public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+        if (overwrite && !context.isBounded()) {
+            throw new UnsupportedOperationException(
+                    "Table store doesn't support streaming INSERT OVERWRITE.");
+        }
+
         LogSinkProvider logSinkProvider = null;
         if (logStoreTableFactory != null) {
             logSinkProvider = logStoreTableFactory.createSinkProvider(this.context, context);
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
index 0f4d5ea5..0cc8fc07 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
@@ -1479,6 +1479,32 @@ public class ReadWriteTableITCase extends ReadWriteTableTestBase {
                 Collections.nCopies(2, changelogRow("+I", "Yen", 1L)));
     }
 
+    @Test
+    public void testStreamingInsertOverwrite() throws Exception {
+        rootPath = TEMPORARY_FOLDER.newFolder().getPath();
+        tEnv =
+                StreamTableEnvironment.create(
+                        buildStreamEnv(), EnvironmentSettings.inStreamingMode());
+        tEnv.executeSql(
+                String.format(
+                        "CREATE TABLE IF NOT EXISTS rates (\n"
+                                + "currency STRING,\n"
+                                + " rate BIGINT,\n"
+                                + " dt STRING\n"
+                                + ") PARTITIONED BY (dt)\n"
+                                + "WITH (\n"
+                                + " 'bucket' = '2',\n"
+                                + " 'root-path' = '%s'\n"
+                                + ")",
+                        rootPath));
+        assertThatThrownBy(
+                        () ->
+                                tEnv.executeSql(
+                                        "INSERT OVERWRITE rates VALUES('US Dollar', 102, '2022-06-20')"))
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessage("Table store doesn't support streaming INSERT OVERWRITE.");
+    }
+
     // ------------------------ Tools ----------------------------------
 
     private String collectAndCheckBatchReadWrite(