You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/09/09 07:07:34 UTC
[flink-table-store] branch master updated: [FLINK-29226] Throw exception for streaming insert overwrite
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new f35ae349 [FLINK-29226] Throw exception for streaming insert overwrite
f35ae349 is described below
commit f35ae349f115da82dcadb369e607b56e8978f220
Author: Nicholas Jiang <pr...@163.com>
AuthorDate: Fri Sep 9 15:07:28 2022 +0800
[FLINK-29226] Throw exception for streaming insert overwrite
This closes #288
---
.../table/store/connector/sink/TableStoreSink.java | 5 +++++
.../store/connector/ReadWriteTableITCase.java | 26 ++++++++++++++++++++++
2 files changed, 31 insertions(+)
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
index bb26ea36..484fa82f 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
@@ -104,6 +104,11 @@ public class TableStoreSink implements DynamicTableSink, SupportsOverwrite, Supp
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+ if (overwrite && !context.isBounded()) {
+ throw new UnsupportedOperationException(
+ "Table store doesn't support streaming INSERT OVERWRITE.");
+ }
+
LogSinkProvider logSinkProvider = null;
if (logStoreTableFactory != null) {
logSinkProvider = logStoreTableFactory.createSinkProvider(this.context, context);
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
index 0f4d5ea5..0cc8fc07 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
@@ -1479,6 +1479,32 @@ public class ReadWriteTableITCase extends ReadWriteTableTestBase {
Collections.nCopies(2, changelogRow("+I", "Yen", 1L)));
}
+ @Test
+ public void testStreamingInsertOverwrite() throws Exception {
+ rootPath = TEMPORARY_FOLDER.newFolder().getPath();
+ tEnv =
+ StreamTableEnvironment.create(
+ buildStreamEnv(), EnvironmentSettings.inStreamingMode());
+ tEnv.executeSql(
+ String.format(
+ "CREATE TABLE IF NOT EXISTS rates (\n"
+ + "currency STRING,\n"
+ + " rate BIGINT,\n"
+ + " dt STRING\n"
+ + ") PARTITIONED BY (dt)\n"
+ + "WITH (\n"
+ + " 'bucket' = '2',\n"
+ + " 'root-path' = '%s'\n"
+ + ")",
+ rootPath));
+ assertThatThrownBy(
+ () ->
+ tEnv.executeSql(
+ "INSERT OVERWRITE rates VALUES('US Dollar', 102, '2022-06-20')"))
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessage("Table store doesn't support streaming INSERT OVERWRITE.");
+ }
+
// ------------------------ Tools ----------------------------------
private String collectAndCheckBatchReadWrite(