You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "jinfeng (Jira)" <ji...@apache.org> on 2022/02/08 16:04:00 UTC

[jira] [Created] (FLINK-26016) FileSystemLookupFunction does not produce correct results when hive table uses columnar storage

jinfeng created FLINK-26016:
-------------------------------

             Summary: FileSystemLookupFunction does not produce correct results when hive table uses columnar storage
                 Key: FLINK-26016
                 URL: https://issues.apache.org/jira/browse/FLINK-26016
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Hive
    Affects Versions: 1.14.3
            Reporter: jinfeng


When I use the parquet hive table as the lookup table, there will be some records that cannot be joined. This can be reproduced by adding unit tests to HiveLookupJoinITCase.

{code:java}
  // create the hive table with columnar storage.
        tableEnv.executeSql(
                String.format(
                        "create table columnar_table (x string) STORED AS PARQUET "
                                + "tblproperties ('%s'='5min')",
                        HiveOptions.LOOKUP_JOIN_CACHE_TTL.key()));

    @Test
    public void testLookupJoinTableWithColumnarStorage() throws Exception {
        // constructs test data, as the DEFAULT_SIZE of VectorizedColumnBatch is 2048, we should
        // write as least 2048 records to the test table.
        List<Row> testData = new ArrayList<>(4096);
        for (int i = 0; i < 4096; i++) {
            testData.add(Row.of(String.valueOf(i)));
        }

        // constructs test data using values table
        TableEnvironment batchEnv = HiveTestUtils.createTableEnvInBatchMode(SqlDialect.DEFAULT);
        batchEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
        batchEnv.useCatalog(hiveCatalog.getName());
        String dataId = TestValuesTableFactory.registerData(testData);
        batchEnv.executeSql(
                String.format(
                        "create table value_source(x string, p as proctime()) with ("
                                + "'connector' = 'values', 'data-id' = '%s', 'bounded'='true')",
                        dataId));
        batchEnv.executeSql("insert overwrite columnar_table select x from value_source").await();
        TableImpl flinkTable =
                (TableImpl)
                        tableEnv.sqlQuery(
                                "select t.x as x1, c.x as x2 from value_source t "
                                        + "left join columnar_table for system_time as of t.p c "
                                        + "on t.x = c.x where c.x is null");
        List<Row> results = CollectionUtil.iteratorToList(flinkTable.execute().collect());
        assertTrue(results.size() == 0);
    }
{code}

The problem may be caused by the following code. 

{code:java}
RowData row;
while ((row = partitionReader.read(reuse)) != null) {
   count++;
   RowData key = extractLookupKey(row);
   List<RowData> rows = cache.computeIfAbsent(key, k -> new ArrayList<>());
   rows.add(serializer.copy(row));
}
{code}

         
It can be fixed with the following modification
{code:java}
RowData row;
while ((row = partitionReader.read(reuse)) != null) {
    count++;
    RowData rowData = serializer.copy(row);
    RowData key = extractLookupKey(rowData);
    List<RowData> rows = cache.computeIfAbsent(key, k -> new ArrayList<>());
    rows.add(rowData);
}
{code}




--
This message was sent by Atlassian Jira
(v8.20.1#820001)