You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/07/14 06:06:27 UTC
[flink] branch master updated: [FLINK-18529][hive] Query Hive table
and filter by timestamp partition can fail
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 81f41a6 [FLINK-18529][hive] Query Hive table and filter by timestamp partition can fail
81f41a6 is described below
commit 81f41a6d3fda3902e8ee850d4ab8043a0e9bf763
Author: Rui Li <li...@apache.org>
AuthorDate: Tue Jul 14 14:05:40 2020 +0800
[FLINK-18529][hive] Query Hive table and filter by timestamp partition can fail
This closes #12856
---
.../table/catalog/hive/util/HiveTableUtil.java | 21 ++++++++-------------
.../connectors/hive/HiveTableSourceITCase.java | 14 +++++++++++++-
2 files changed, 21 insertions(+), 14 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
index d5d5413..3a04f1a 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
@@ -46,6 +46,7 @@ import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.functions.hive.conversion.HiveInspectors;
import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -453,22 +454,16 @@ public class HiveTableUtil {
if (value == null) {
return "null";
}
+ LogicalTypeRoot typeRoot = dataType.getLogicalType().getTypeRoot();
+ if (typeRoot.getFamilies().contains(LogicalTypeFamily.DATETIME)) {
+ // hive not support partition filter push down with these types.
+ return null;
+ }
value = HiveInspectors.getConversion(HiveInspectors.getObjectInspector(dataType), dataType.getLogicalType(), hiveShim)
.toHiveObject(value);
String res = value.toString();
- LogicalTypeRoot typeRoot = dataType.getLogicalType().getTypeRoot();
- switch (typeRoot) {
- case CHAR:
- case VARCHAR:
- res = "'" + res.replace("'", "''") + "'";
- break;
- case DATE:
- case TIMESTAMP_WITHOUT_TIME_ZONE:
- case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
- // hive not support partition filter push down with these types.
- return null;
- default:
- break;
+ if (typeRoot == LogicalTypeRoot.CHAR || typeRoot == LogicalTypeRoot.VARCHAR) {
+ res = "'" + res.replace("'", "''") + "'";
}
return res;
}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
index 5170920..b2badf9 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
@@ -289,6 +289,14 @@ public class HiveTableSourceITCase extends BatchAbstractTestBase {
assertTrue(optimizedPlan, optimizedPlan.contains("PartitionPruned: true, PartitionNums: 1"));
results = Lists.newArrayList(query.execute().collect());
assertEquals("[4]", results.toString());
+
+ query = tableEnv.sqlQuery("select x from db1.part where '' = p2");
+ explain = query.explain().split("==.*==\n");
+ assertFalse(catalog.fallback);
+ optimizedPlan = explain[2];
+ assertTrue(optimizedPlan, optimizedPlan.contains("PartitionPruned: true, PartitionNums: 0"));
+ results = Lists.newArrayList(query.execute().collect());
+ assertEquals("[]", results.toString());
} finally {
tableEnv.executeSql("drop database db1 cascade");
}
@@ -319,7 +327,11 @@ public class HiveTableSourceITCase extends BatchAbstractTestBase {
assertTrue(optimizedPlan, optimizedPlan.contains("PartitionPruned: true, PartitionNums: 1"));
List<Row> results = Lists.newArrayList(query.execute().collect());
assertEquals("[3]", results.toString());
- System.out.println(results);
+
+ // filter by timestamp partition
+ query = tableEnv.sqlQuery("select x from db1.part where timestamp '2018-08-08 08:08:09' = p2");
+ results = Lists.newArrayList(query.execute().collect());
+ assertEquals("[2]", results.toString());
} finally {
tableEnv.executeSql("drop database db1 cascade");
}