You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Jin Yi <el...@gmail.com> on 2020/02/19 00:14:52 UTC

[Quesetion] how to havee additional Logging in Apache Beam KafkaWriter

Hi there,

I am using Apache Beam (v2.16) in my application, and the Runner is
Flink(1.8). I use KafkaIO connector to consume from source topics and
publish to sink topics.

Here is the class that Apache Beam provides for publishing messages.
https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java

Due to requirement, I need to log at info level for every message that has
been published (regardless successful or failed).

So essentially, in this class, I need the logging added below, are there
any suggestions for it?

private class SendCallback implements Callback {
  @Override
  public void onCompletion(RecordMetadata metadata, Exception exception) {
    if (exception == null) {
      LOG.info("PublishToKafkaTopic. Published someId={} to
topic={}",someId, topic);
    } else {
      LOG.error("PublishToKafkaTopic. Error publishing someId={} to
topic={}",someId, topic, exception);
    }

Thanks a lot!

Eleanore