You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Zain Haider Nemati <za...@retailo.co> on 2022/05/11 19:41:04 UTC

Incompatible data types while using firehose sink

Hi Folks,
Getting this error when sinking data to a firehosesink, would really
appreciate some help !

DataStream<String> inputStream = env.addSource(new
FlinkKafkaConsumer<>("xxx", new SimpleStringSchema(), properties));

Properties sinkProperties = new Properties();

            sinkProperties.put(AWSConfigConstants.AWS_REGION, "xxx");

            sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "xxx");

            sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY,
"xxx");

            KinesisFirehoseSink<String> kdfSink =
KinesisFirehoseSink.<String>builder()

                    .setFirehoseClientProperties(sinkProperties)

                    .setSerializationSchema(new SimpleStringSchema())

                    .setDeliveryStreamName("xxx")

                    .setMaxBatchSize(350)

                    .build();

inputStream.sinkTo(kdfSink);


incompatible types:
org.apache.flink.connector.firehose.sink.KinesisFirehoseSink<java.lang.String>
cannot be converted to
org.apache.flink.api.connector.sink.Sink<java.lang.String,?,?,?>