You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Wei Zhong (Jira)" <ji...@apache.org> on 2021/03/09 03:57:00 UTC

[jira] [Created] (FLINK-21679) Set output type for transformations from SourceProvider and DataStreamScanProvider in CommonExecTableSourceScan

Wei Zhong created FLINK-21679:
---------------------------------

             Summary: Set output type for transformations from SourceProvider and DataStreamScanProvider in CommonExecTableSourceScan
                 Key: FLINK-21679
                 URL: https://issues.apache.org/jira/browse/FLINK-21679
             Project: Flink
          Issue Type: Improvement
          Components: Table SQL / Planner
            Reporter: Wei Zhong
            Assignee: Wei Zhong


Currently we only set output type for the transformations from SourceFunctionProvider and InputFormatProvider in CommonExecTableSourceScan:
{code:java}
@Override
protected Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
    final StreamExecutionEnvironment env = planner.getExecEnv();
    final String operatorName = getDescription();
    final InternalTypeInfo<RowData> outputTypeInfo =
            InternalTypeInfo.of((RowType) getOutputType());
    final ScanTableSource tableSource = tableSourceSpec.getScanTableSource(planner);
    ScanTableSource.ScanRuntimeProvider provider =
            tableSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
    if (provider instanceof SourceFunctionProvider) {
        SourceFunction<RowData> sourceFunction =
                ((SourceFunctionProvider) provider).createSourceFunction();
        return env.addSource(sourceFunction, operatorName, outputTypeInfo).getTransformation();
    } else if (provider instanceof InputFormatProvider) {
        InputFormat<RowData, ?> inputFormat =
                ((InputFormatProvider) provider).createInputFormat();
        return createInputFormatTransformation(env, inputFormat, outputTypeInfo, operatorName);
    } else if (provider instanceof SourceProvider) {
        // outputTypeInfo is not set here
        Source<RowData, ?, ?> source = ((SourceProvider) provider).createSource();
        return env.fromSource(source, WatermarkStrategy.noWatermarks(), operatorName)
                .getTransformation();
    } else if (provider instanceof DataStreamScanProvider) {
        // outputTypeInfo is not set here
        return ((DataStreamScanProvider) provider).produceDataStream(env).getTransformation();
    } else {
        throw new UnsupportedOperationException(
                provider.getClass().getSimpleName() + " is unsupported now.");
    }
}{code}
We can also set output type for transformations from SourceProvider and DataStreamScanProvider in CommonExecTableSourceScan, so that users do not need to implement a ResultQueryable interface when implementing the new Source interface in FLIP-27.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)