You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Ingo Bürk (Jira)" <ji...@apache.org> on 2021/08/23 06:38:00 UTC

[jira] [Created] (FLINK-23911) Projections are not considered when pushing readable metadata into a source

Ingo Bürk created FLINK-23911:
---------------------------------

             Summary: Projections are not considered when pushing readable metadata into a source
                 Key: FLINK-23911
                 URL: https://issues.apache.org/jira/browse/FLINK-23911
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
    Affects Versions: 1.13.2
            Reporter: Ingo Bürk


Given a table with a declared schema containing some metadata columns, if we select only some of those metadata columns (or none), the interface of SupportsReadableMetadata states that the planner will perform the projection and only push required metadata keys into the source:
{quote}The planner will select required metadata columns (i.e. perform projection push down) and will call \{@link #applyReadableMetadata(List, DataType)} with a list of metadata keys.{quote}
However, it seems that this doesn't happen, and the planner always applies all metadata declared in the schema instead. This can be a problem because the source has to do unnecessary work, and some metadata might be more expensive to compute than others.

For reference, SupportsProjectionPushDown can not be used to workaround this because it operates only on physical columns, i.e. #applyProjections will never be called with a projection for the metadata columns, even if they are selected.

The following test case can be executed to debug into #applyReadableMetadata of the values table source:
{code:java}
@Test
def test(): Unit = {
  val tableId = TestValuesTableFactory.registerData(Seq())

  tEnv.createTemporaryTable("T", TableDescriptor.forConnector("values")
    .schema(Schema.newBuilder()
      .column("f0", DataTypes.INT())
      .columnByMetadata("m1", DataTypes.STRING())
      .columnByMetadata("m2", DataTypes.STRING())
      .build())
    .option("data-id", tableId)
    .option("bounded", "true")
    .option("readable-metadata", "m1:STRING,m2:STRING")
    .build())

  tEnv.sqlQuery("SELECT f0, m1 FROM T").execute().collect().toList
}
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)