You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/09/06 02:57:26 UTC
[flink] branch master updated: [FLINK-13952][table-planner][hive]
PartitionableTableSink can not work with OverwritableTableSink
This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new d32af52 [FLINK-13952][table-planner][hive] PartitionableTableSink can not work with OverwritableTableSink
d32af52 is described below
commit d32af521cbe83f88cd0b822c4d752a1b5102c47c
Author: Rui Li <li...@apache.org>
AuthorDate: Wed Sep 4 21:27:00 2019 +0800
[FLINK-13952][table-planner][hive] PartitionableTableSink can not work with OverwritableTableSink
To support insert overwrite partition.
This closes #9615.
---
.../flink/connectors/hive/TableEnvHiveConnectorTest.java | 16 ++++++++++++++++
.../flink/table/planner/delegation/PlannerBase.scala | 3 +++
.../apache/flink/table/api/internal/TableEnvImpl.scala | 5 ++++-
.../org/apache/flink/table/planner/StreamPlanner.scala | 5 ++++-
4 files changed, 27 insertions(+), 2 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
index 07dd674..e39999a 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
@@ -177,6 +177,7 @@ public class TableEnvHiveConnectorTest {
public void testInsertOverwrite() throws Exception {
hiveShell.execute("create database db1");
try {
+ // non-partitioned
hiveShell.execute("create table db1.dest (x int, y string)");
hiveShell.insertInto("db1", "dest").addRow(1, "a").addRow(2, "b").commit();
verifyHiveQueryResult("select * from db1.dest", Arrays.asList("1\ta", "2\tb"));
@@ -184,6 +185,21 @@ public class TableEnvHiveConnectorTest {
tableEnv.sqlUpdate("insert overwrite db1.dest values (3,'c')");
tableEnv.execute("test insert overwrite");
verifyHiveQueryResult("select * from db1.dest", Collections.singletonList("3\tc"));
+
+ // static partition
+ hiveShell.execute("create table db1.part(x int) partitioned by (y int)");
+ hiveShell.insertInto("db1", "part").addRow(1, 1).addRow(2, 2).commit();
+ tableEnv = getTableEnvWithHiveCatalog();
+ tableEnv.sqlUpdate("insert overwrite db1.part partition (y=1) select 100");
+ tableEnv.execute("insert overwrite static partition");
+ verifyHiveQueryResult("select * from db1.part", Arrays.asList("100\t1", "2\t2"));
+
+ // dynamic partition
+ tableEnv = getTableEnvWithHiveCatalog();
+ tableEnv.sqlUpdate("insert overwrite db1.part values (200,2),(3,3)");
+ tableEnv.execute("insert overwrite dynamic partition");
+ // only overwrite dynamically matched partitions, other existing partitions remain intact
+ verifyHiveQueryResult("select * from db1.part", Arrays.asList("100\t1", "200\t2", "3\t3"));
} finally {
hiveShell.execute("drop database db1 cascade");
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
index f0e18f0..90cdab9 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
@@ -182,6 +182,9 @@ abstract class PlannerBase(
if partitionableSink.getPartitionFieldNames != null
&& partitionableSink.getPartitionFieldNames.nonEmpty =>
partitionableSink.setStaticPartition(catalogSink.getStaticPartitions)
+ case _ =>
+ }
+ sink match {
case overwritableTableSink: OverwritableTableSink =>
overwritableTableSink.setOverwrite(catalogSink.isOverwrite)
case _ =>
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index 0dece49..0e00268 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -474,12 +474,15 @@ abstract class TableEnvImpl(
objectIdentifier,
tableSink)
// set static partitions if it is a partitioned table sink
- // set whether to overwrite if it's an OverwritableTableSink
tableSink match {
case partitionableSink: PartitionableTableSink
if partitionableSink.getPartitionFieldNames != null
&& partitionableSink.getPartitionFieldNames.nonEmpty =>
partitionableSink.setStaticPartition(insertOptions.staticPartitions)
+ case _ =>
+ }
+ // set whether to overwrite if it's an OverwritableTableSink
+ tableSink match {
case overwritableTableSink: OverwritableTableSink =>
overwritableTableSink.setOverwrite(insertOptions.overwrite)
case _ =>
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
index 140198b..10a04de 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
@@ -160,12 +160,15 @@ class StreamPlanner(
identifier,
sink)
// set static partitions if it is a partitioned sink
- // set whether to overwrite if it's an OverwritableTableSink
sink match {
case partitionableSink: PartitionableTableSink
if partitionableSink.getPartitionFieldNames != null
&& partitionableSink.getPartitionFieldNames.nonEmpty =>
partitionableSink.setStaticPartition(catalogSink.getStaticPartitions)
+ case _ =>
+ }
+ // set whether to overwrite if it's an OverwritableTableSink
+ sink match {
case overwritableTableSink: OverwritableTableSink =>
overwritableTableSink.setOverwrite(catalogSink.isOverwrite)
case _ =>