You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Victor Wong (Jira)" <ji...@apache.org> on 2020/01/03 07:23:00 UTC
[jira] [Commented] (FLINK-15450) Add kafka topic information to
Kafka source name on Flink UI
[ https://issues.apache.org/jira/browse/FLINK-15450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17007265#comment-17007265 ]
Victor Wong commented on FLINK-15450:
-------------------------------------
Any suggestion on this:)
> Add kafka topic information to Kafka source name on Flink UI
> ------------------------------------------------------------
>
> Key: FLINK-15450
> URL: https://issues.apache.org/jira/browse/FLINK-15450
> Project: Flink
> Issue Type: Improvement
> Components: API / Core, Connectors / Kafka
> Reporter: Victor Wong
> Priority: Major
>
> If the user did not specify a custom name to the source, e.g. Kafka source, Flink would use the default name "Custom Source", which was not intuitive (Sink was the same).
> {code:java}
> Source: Custom Source -> Filter -> Map -> Sink: Unnamed
> {code}
> If we could add the Kafka topic information to the default Source/Sink name, it would be very helpful to catch the consuming/publishing topic quickly, like this:
> {code:java}
> Source: srcTopic0, srcTopic1 -> Filter -> Map -> Sink: sinkTopic0, sinkTopic1
> {code}
> *Suggestion* (forgive me if it makes too many changes)
> 1. Add a `name` method to interface `Function`
> {code:java}
> public interface Function extends java.io.Serializable {
> default String name() { return ""; }
> }
> {code}
> 2. Source/Sink/Other functions override this method depending on their needs.
> {code:java}
> class FlinkKafkaConsumerBase {
> String name() {
> return this.topicsDescriptor.toString();
> }
> }
> {code}
> 3. Use Function#name if the returned value is not empty.
> {code:java}
> // StreamExecutionEnvironment
> public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function) {
> String sourceName = function.name();
> if (StringUtils.isNullOrWhitespaceOnly(sourceName)) {
> sourceName = "Custom Source";
> }
> return addSource(function, sourceName);
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)