You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Victor Wong (Jira)" <ji...@apache.org> on 2019/12/31 11:10:00 UTC

[jira] [Created] (FLINK-15450) Add kafka topic information to Kafka source

Victor Wong created FLINK-15450:
-----------------------------------

             Summary: Add kafka topic information to Kafka source
                 Key: FLINK-15450
                 URL: https://issues.apache.org/jira/browse/FLINK-15450
             Project: Flink
          Issue Type: Improvement
          Components: Connectors / Kafka
            Reporter: Victor Wong


If the user did not specify a custom name to the source, e.g. Kafka source, Flink would use the default name "Custom Source", which was not intuitive (Sink was the same).


{code:java}
Source: Custom Source -> Filter -> Map -> Sink: Unnamed
{code}

If we could add the Kafka topic information to the default Source/Sink name, it would be very helpful to catch the consuming/publishing topic quickly, like this:

{code:java}
Source: srcTopic0, srcTopic1 -> Filter -> Map -> Sink: sinkTopic0, sinkTopic1
{code}

*Suggesion* (forgive me if it makes too much changes)

1. Add a `name` method to interface `Function`

{code:java}
public interface Function extends java.io.Serializable {
	default String name() { return ""; }
}
{code}

2. Source/Sink/Other functions override this method depending on their needs.

{code:java}
class FlinkKafkaConsumerBase {

String name() {
  return this.topicsDescriptor.toString();
}

}
{code}

3. Use Function#name if the returned value is not empty.

{code:java}
// StreamExecutionEnvironment
	public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function) {
		String sourceName = function.name();
		if (StringUtils.isNullOrWhitespaceOnly(sourceName)) {
			sourceName = "Custom Source";
		}
		return addSource(function, sourceName);
	}
{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)