You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/09/06 02:57:26 UTC

[flink] branch master updated: [FLINK-13952][table-planner][hive] PartitionableTableSink can not work with OverwritableTableSink

This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d32af52  [FLINK-13952][table-planner][hive] PartitionableTableSink can not work with OverwritableTableSink
d32af52 is described below

commit d32af521cbe83f88cd0b822c4d752a1b5102c47c
Author: Rui Li <li...@apache.org>
AuthorDate: Wed Sep 4 21:27:00 2019 +0800

    [FLINK-13952][table-planner][hive] PartitionableTableSink can not work with OverwritableTableSink
    
    To support insert overwrite partition.
    
    This closes #9615.
---
 .../flink/connectors/hive/TableEnvHiveConnectorTest.java | 16 ++++++++++++++++
 .../flink/table/planner/delegation/PlannerBase.scala     |  3 +++
 .../apache/flink/table/api/internal/TableEnvImpl.scala   |  5 ++++-
 .../org/apache/flink/table/planner/StreamPlanner.scala   |  5 ++++-
 4 files changed, 27 insertions(+), 2 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
index 07dd674..e39999a 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
@@ -177,6 +177,7 @@ public class TableEnvHiveConnectorTest {
 	public void testInsertOverwrite() throws Exception {
 		hiveShell.execute("create database db1");
 		try {
+			// non-partitioned
 			hiveShell.execute("create table db1.dest (x int, y string)");
 			hiveShell.insertInto("db1", "dest").addRow(1, "a").addRow(2, "b").commit();
 			verifyHiveQueryResult("select * from db1.dest", Arrays.asList("1\ta", "2\tb"));
@@ -184,6 +185,21 @@ public class TableEnvHiveConnectorTest {
 			tableEnv.sqlUpdate("insert overwrite db1.dest values (3,'c')");
 			tableEnv.execute("test insert overwrite");
 			verifyHiveQueryResult("select * from db1.dest", Collections.singletonList("3\tc"));
+
+			// static partition
+			hiveShell.execute("create table db1.part(x int) partitioned by (y int)");
+			hiveShell.insertInto("db1", "part").addRow(1, 1).addRow(2, 2).commit();
+			tableEnv = getTableEnvWithHiveCatalog();
+			tableEnv.sqlUpdate("insert overwrite db1.part partition (y=1) select 100");
+			tableEnv.execute("insert overwrite static partition");
+			verifyHiveQueryResult("select * from db1.part", Arrays.asList("100\t1", "2\t2"));
+
+			// dynamic partition
+			tableEnv = getTableEnvWithHiveCatalog();
+			tableEnv.sqlUpdate("insert overwrite db1.part values (200,2),(3,3)");
+			tableEnv.execute("insert overwrite dynamic partition");
+			// only overwrite dynamically matched partitions, other existing partitions remain intact
+			verifyHiveQueryResult("select * from db1.part", Arrays.asList("100\t1", "200\t2", "3\t3"));
 		} finally {
 			hiveShell.execute("drop database db1 cascade");
 		}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
index f0e18f0..90cdab9 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
@@ -182,6 +182,9 @@ abstract class PlannerBase(
               if partitionableSink.getPartitionFieldNames != null
                 && partitionableSink.getPartitionFieldNames.nonEmpty =>
               partitionableSink.setStaticPartition(catalogSink.getStaticPartitions)
+            case _ =>
+          }
+          sink match {
             case overwritableTableSink: OverwritableTableSink =>
               overwritableTableSink.setOverwrite(catalogSink.isOverwrite)
             case _ =>
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index 0dece49..0e00268 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -474,12 +474,15 @@ abstract class TableEnvImpl(
           objectIdentifier,
           tableSink)
         // set static partitions if it is a partitioned table sink
-        // set whether to overwrite if it's an OverwritableTableSink
         tableSink match {
           case partitionableSink: PartitionableTableSink
             if partitionableSink.getPartitionFieldNames != null
               && partitionableSink.getPartitionFieldNames.nonEmpty =>
             partitionableSink.setStaticPartition(insertOptions.staticPartitions)
+          case _ =>
+        }
+        // set whether to overwrite if it's an OverwritableTableSink
+        tableSink match {
           case overwritableTableSink: OverwritableTableSink =>
             overwritableTableSink.setOverwrite(insertOptions.overwrite)
           case _ =>
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
index 140198b..10a04de 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
@@ -160,12 +160,15 @@ class StreamPlanner(
               identifier,
               sink)
             // set static partitions if it is a partitioned sink
-            // set whether to overwrite if it's an OverwritableTableSink
             sink match {
               case partitionableSink: PartitionableTableSink
                 if partitionableSink.getPartitionFieldNames != null
                   && partitionableSink.getPartitionFieldNames.nonEmpty =>
                 partitionableSink.setStaticPartition(catalogSink.getStaticPartitions)
+              case _ =>
+            }
+            // set whether to overwrite if it's an OverwritableTableSink
+            sink match {
               case overwritableTableSink: OverwritableTableSink =>
                 overwritableTableSink.setOverwrite(catalogSink.isOverwrite)
               case _ =>