You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/07/14 06:06:27 UTC

[flink] branch master updated: [FLINK-18529][hive] Query Hive table and filter by timestamp partition can fail

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 81f41a6  [FLINK-18529][hive] Query Hive table and filter by timestamp partition can fail
81f41a6 is described below

commit 81f41a6d3fda3902e8ee850d4ab8043a0e9bf763
Author: Rui Li <li...@apache.org>
AuthorDate: Tue Jul 14 14:05:40 2020 +0800

    [FLINK-18529][hive] Query Hive table and filter by timestamp partition can fail
    
    This closes #12856
---
 .../table/catalog/hive/util/HiveTableUtil.java      | 21 ++++++++-------------
 .../connectors/hive/HiveTableSourceITCase.java      | 14 +++++++++++++-
 2 files changed, 21 insertions(+), 14 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
index d5d5413..3a04f1a 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
@@ -46,6 +46,7 @@ import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.functions.hive.conversion.HiveInspectors;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
 
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -453,22 +454,16 @@ public class HiveTableUtil {
 			if (value == null) {
 				return "null";
 			}
+			LogicalTypeRoot typeRoot = dataType.getLogicalType().getTypeRoot();
+			if (typeRoot.getFamilies().contains(LogicalTypeFamily.DATETIME)) {
+				// hive not support partition filter push down with these types.
+				return null;
+			}
 			value = HiveInspectors.getConversion(HiveInspectors.getObjectInspector(dataType), dataType.getLogicalType(), hiveShim)
 					.toHiveObject(value);
 			String res = value.toString();
-			LogicalTypeRoot typeRoot = dataType.getLogicalType().getTypeRoot();
-			switch (typeRoot) {
-				case CHAR:
-				case VARCHAR:
-					res = "'" + res.replace("'", "''") + "'";
-					break;
-				case DATE:
-				case TIMESTAMP_WITHOUT_TIME_ZONE:
-				case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
-					// hive not support partition filter push down with these types.
-					return null;
-				default:
-					break;
+			if (typeRoot == LogicalTypeRoot.CHAR || typeRoot == LogicalTypeRoot.VARCHAR) {
+				res = "'" + res.replace("'", "''") + "'";
 			}
 			return res;
 		}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
index 5170920..b2badf9 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
@@ -289,6 +289,14 @@ public class HiveTableSourceITCase extends BatchAbstractTestBase {
 			assertTrue(optimizedPlan, optimizedPlan.contains("PartitionPruned: true, PartitionNums: 1"));
 			results = Lists.newArrayList(query.execute().collect());
 			assertEquals("[4]", results.toString());
+
+			query = tableEnv.sqlQuery("select x from db1.part where '' = p2");
+			explain = query.explain().split("==.*==\n");
+			assertFalse(catalog.fallback);
+			optimizedPlan = explain[2];
+			assertTrue(optimizedPlan, optimizedPlan.contains("PartitionPruned: true, PartitionNums: 0"));
+			results = Lists.newArrayList(query.execute().collect());
+			assertEquals("[]", results.toString());
 		} finally {
 			tableEnv.executeSql("drop database db1 cascade");
 		}
@@ -319,7 +327,11 @@ public class HiveTableSourceITCase extends BatchAbstractTestBase {
 			assertTrue(optimizedPlan, optimizedPlan.contains("PartitionPruned: true, PartitionNums: 1"));
 			List<Row> results = Lists.newArrayList(query.execute().collect());
 			assertEquals("[3]", results.toString());
-			System.out.println(results);
+
+			// filter by timestamp partition
+			query = tableEnv.sqlQuery("select x from db1.part where timestamp '2018-08-08 08:08:09' = p2");
+			results = Lists.newArrayList(query.execute().collect());
+			assertEquals("[2]", results.toString());
 		} finally {
 			tableEnv.executeSql("drop database db1 cascade");
 		}