You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2023/01/30 11:46:16 UTC

[flink] branch release-1.15 updated: [FLINK-30679][connectors/hive] Fix IndexOutOfBoundsException for Hive lookup join when column pushdown to Hive lookup table source (#21783)

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new b412bc315fc [FLINK-30679][connectors/hive] Fix IndexOutOfBoundsException for Hive lookup join when column pushdown to Hive lookup table source (#21783)
b412bc315fc is described below

commit b412bc315fc851c453266cd7f6f98ef3ca0ea747
Author: hehuiyuan <47...@qq.com>
AuthorDate: Mon Jan 30 19:46:05 2023 +0800

    [FLINK-30679][connectors/hive] Fix IndexOutOfBoundsException for Hive lookup join when column pushdown to Hive lookup table source (#21783)
    
    Co-authored-by: hehuiyuan1 <he...@jd.com>
---
 .../org/apache/flink/connectors/hive/HiveLookupTableSource.java   | 4 ++--
 .../org/apache/flink/connectors/hive/HiveLookupJoinITCase.java    | 8 ++++----
 2 files changed, 6 insertions(+), 6 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveLookupTableSource.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveLookupTableSource.java
index 296f7cd89eb..8295104dc5e 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveLookupTableSource.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveLookupTableSource.java
@@ -257,8 +257,8 @@ public class HiveLookupTableSource extends HiveTableSource implements LookupTabl
                         jobConf,
                         hiveVersion,
                         tablePath,
-                        getProducedTableSchema().getFieldDataTypes(),
-                        getProducedTableSchema().getFieldNames(),
+                        getTableSchema().getFieldDataTypes(),
+                        getTableSchema().getFieldNames(),
                         catalogTable.getPartitionKeys(),
                         projectedFields,
                         flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER));
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveLookupJoinITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveLookupJoinITCase.java
index 489062e287b..1204ba080d8 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveLookupJoinITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveLookupJoinITCase.java
@@ -364,17 +364,17 @@ public class HiveLookupJoinITCase {
         batchEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
         batchEnv.useCatalog(hiveCatalog.getName());
         batchEnv.executeSql(
-                        "insert overwrite bounded_table values (1,'a',10),(2,'a',21),(2,'b',22),(3,'c',33)")
+                        "insert overwrite bounded_table values (1,'a',10),(2,'b',22),(3,'c',33)")
                 .await();
         tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
         TableImpl flinkTable =
                 (TableImpl)
                         tableEnv.sqlQuery(
-                                "select b.x, b.y from "
+                                "select b.x, b.z from "
                                         + " default_catalog.default_database.probe as p "
-                                        + " join bounded_table for system_time as of p.p as b on p.x=b.x and p.y=b.y");
+                                        + " join bounded_table for system_time as of p.p as b on p.x=b.x");
         List<Row> results = CollectionUtil.iteratorToList(flinkTable.execute().collect());
-        assertEquals("[+I[1, a], +I[2, b], +I[3, c]]", results.toString());
+        assertEquals("[+I[1, 10], +I[1, 10], +I[2, 22], +I[2, 22], +I[3, 33]]", results.toString());
     }
 
     @Test