You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Hang Ruan (Jira)" <ji...@apache.org> on 2021/12/16 09:29:00 UTC

[jira] [Created] (FLINK-25342) DataStream.sinkTo will not add sink to the sinks field in the StreamGraph

Hang Ruan created FLINK-25342:
---------------------------------

             Summary: DataStream.sinkTo will not add sink to the sinks field in the StreamGraph
                 Key: FLINK-25342
                 URL: https://issues.apache.org/jira/browse/FLINK-25342
             Project: Flink
          Issue Type: Bug
    Affects Versions: 1.14.0
            Reporter: Hang Ruan


I run a test in my IDEA and watch the generated StreamGraph. It seems like the sink is not in the field sinks in the StreamGraph. My test is as follows:
{code:java}
@Test
public void selfTest() throws Exception {
    StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStream<String> source = execEnv.fromSource(
            KafkaSource.<String>builder()
                    .setGroupId("flink-kafka-test")
                    .setDeserializer(
                            KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
                    .setTopics("scaleDownTest")
                    .setBootstrapServers("localhost:9092")
                    .build(),
            WatermarkStrategy.noWatermarks(), "Kafka Source");

    Properties props = new Properties();
    props.setProperty("transaction.timeout.ms", "900000");
    source.sinkTo(KafkaSink.<String>builder()
            .setBootstrapServers("localhost:9092")
            .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
            .setTransactionalIdPrefix("tp-test-")
            .setKafkaProducerConfig(props)
            .setRecordSerializer(new SelfSerializationSchema("scaleDownTestSink1", new SimpleStringSchema()))
            .build());

    execEnv.execute("ScaleDownTest");
} {code}





--
This message was sent by Atlassian Jira
(v8.20.1#820001)