You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "jinfeng (Jira)" <ji...@apache.org> on 2022/02/08 16:04:00 UTC
[jira] [Created] (FLINK-26016) FileSystemLookupFunction does not produce correct results when hive table uses columnar storage
jinfeng created FLINK-26016:
-------------------------------
Summary: FileSystemLookupFunction does not produce correct results when hive table uses columnar storage
Key: FLINK-26016
URL: https://issues.apache.org/jira/browse/FLINK-26016
Project: Flink
Issue Type: Bug
Components: Connectors / Hive
Affects Versions: 1.14.3
Reporter: jinfeng
When I use the parquet hive table as the lookup table, there will be some records that cannot be joined. This can be reproduced by adding unit tests to HiveLookupJoinITCase.
{code:java}
// create the hive table with columnar storage.
tableEnv.executeSql(
String.format(
"create table columnar_table (x string) STORED AS PARQUET "
+ "tblproperties ('%s'='5min')",
HiveOptions.LOOKUP_JOIN_CACHE_TTL.key()));
@Test
public void testLookupJoinTableWithColumnarStorage() throws Exception {
// constructs test data, as the DEFAULT_SIZE of VectorizedColumnBatch is 2048, we should
// write as least 2048 records to the test table.
List<Row> testData = new ArrayList<>(4096);
for (int i = 0; i < 4096; i++) {
testData.add(Row.of(String.valueOf(i)));
}
// constructs test data using values table
TableEnvironment batchEnv = HiveTestUtils.createTableEnvInBatchMode(SqlDialect.DEFAULT);
batchEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
batchEnv.useCatalog(hiveCatalog.getName());
String dataId = TestValuesTableFactory.registerData(testData);
batchEnv.executeSql(
String.format(
"create table value_source(x string, p as proctime()) with ("
+ "'connector' = 'values', 'data-id' = '%s', 'bounded'='true')",
dataId));
batchEnv.executeSql("insert overwrite columnar_table select x from value_source").await();
TableImpl flinkTable =
(TableImpl)
tableEnv.sqlQuery(
"select t.x as x1, c.x as x2 from value_source t "
+ "left join columnar_table for system_time as of t.p c "
+ "on t.x = c.x where c.x is null");
List<Row> results = CollectionUtil.iteratorToList(flinkTable.execute().collect());
assertTrue(results.size() == 0);
}
{code}
The problem may be caused by the following code.
{code:java}
RowData row;
while ((row = partitionReader.read(reuse)) != null) {
count++;
RowData key = extractLookupKey(row);
List<RowData> rows = cache.computeIfAbsent(key, k -> new ArrayList<>());
rows.add(serializer.copy(row));
}
{code}
It can be fixed with the following modification
{code:java}
RowData row;
while ((row = partitionReader.read(reuse)) != null) {
count++;
RowData rowData = serializer.copy(row);
RowData key = extractLookupKey(rowData);
List<RowData> rows = cache.computeIfAbsent(key, k -> new ArrayList<>());
rows.add(rowData);
}
{code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)