You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by aljoscha <gi...@git.apache.org> on 2017/08/29 13:59:51 UTC

[GitHub] flink pull request #4616: [FLINK-7552] [FLINK-7553] Enhance SinkInterface / ...

GitHub user aljoscha opened a pull request:

    https://github.com/apache/flink/pull/4616

    [FLINK-7552] [FLINK-7553] Enhance SinkInterface / Use in FlinkKafkaProducer

    ## What is the purpose of the change
    
    Enhance `SinkFunction` with a way of retrieving the element timestamp. This allows us to get rid of the hybrid nature of `FlinkKafkaProducer010`.
    
    This is keeping the legacy static "convenience" methods à la `FlinkKafkaProducer010.writeToKafkaWithTimestamps` for backwards compatibility.
    
    ## Brief change log
    
      - Enhance Sink interface
      - Use new interface in Kafka Producer
    
    ## Verifying this change
    
    This change is already covered by existing tests, such as *(please describe tests)*.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): no
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
      - The serializers: no
      - The runtime per-record code paths (performance sensitive): yes, call stack of KafkaProducer with writing timestamps is changed slightly, also, `StreamSink` operator now has a context object.
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no
    
    ## Documentation
    
      - Does this pull request introduce a new feature? no
      - If yes, how is the feature documented? not applicable


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/aljoscha/flink jira-7553-fix-kafka010-producer

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4616.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4616
    
----
commit 0b5bea36247736a0160ce584b94050d7b676d091
Author: Aljoscha Krettek <al...@gmail.com>
Date:   2017-08-29T13:50:56Z

    [FLINK-7552] Extend SinkFunction interface with SinkContext

commit d3a7b294542ea40287290ff4970715ead621d398
Author: Aljoscha Krettek <al...@gmail.com>
Date:   2017-08-29T13:53:16Z

    [FLINK-7553] Use new SinkFunction interface in FlinkKafkaProducer010

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4616: [FLINK-7552] [FLINK-7553] Enhance SinkInterface / ...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4616#discussion_r136062152
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java ---
    @@ -35,6 +35,40 @@
     	 *
     	 * @param value The input record.
     	 * @throws Exception
    +	 * @deprecated Use {@link #invoke(SinkContext, Object)}.
     	 */
    -	void invoke(IN value) throws Exception;
    +	@Deprecated
    +	default void invoke(IN value) throws Exception {
    +	}
    +
    +	/**
    +	 * Writes the given value to the sink. This function is called for every record.
    +	 *
    +	 * @param context Additional context about the input record.
    +	 * @param value The input record.
    +	 * @throws Exception
    +	 */
    +	default void invoke(SinkContext context, IN value) throws Exception {
    +		invoke(value);
    +	}
    +
    +	/**
    +	 * Context that {@link SinkFunction SinkFunctions } can use for getting additional data about
    +	 * an input record.
    +	 *
    +	 * <p>The context is only valid for the duration of a
    +	 * {@link SinkFunction#invoke(SinkContext, Object)} call. Do not store the context and use
    +	 * afterwards!
    +	 *
    +	 * @param <T> The type of elements accepted by the sink.
    +	 */
    +	@Public // Interface might be extended in the future with additional methods.
    +	interface SinkContext<T> {
    +
    +		/**
    +		 * Returns the timestamp of the current input record.
    +		 */
    +		long timestamp();
    +	}
    +
    --- End diff --
    
    nit/side question: do we have new line between closing braces `}` at the end of the class in the code style?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4616: [FLINK-7552] [FLINK-7553] Enhance SinkInterface / Use in ...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/4616
  
    Can we add fine-grained exclusions, or would this exclude the whole file?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4616: [FLINK-7552] [FLINK-7553] Enhance SinkInterface / ...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4616#discussion_r136279356
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java ---
    @@ -35,6 +35,40 @@
     	 *
     	 * @param value The input record.
     	 * @throws Exception
    +	 * @deprecated Use {@link #invoke(SinkContext, Object)}.
     	 */
    -	void invoke(IN value) throws Exception;
    +	@Deprecated
    +	default void invoke(IN value) throws Exception {
    +	}
    +
    +	/**
    +	 * Writes the given value to the sink. This function is called for every record.
    +	 *
    +	 * @param context Additional context about the input record.
    +	 * @param value The input record.
    +	 * @throws Exception
    +	 */
    +	default void invoke(SinkContext context, IN value) throws Exception {
    +		invoke(value);
    +	}
    +
    +	/**
    +	 * Context that {@link SinkFunction SinkFunctions } can use for getting additional data about
    +	 * an input record.
    +	 *
    +	 * <p>The context is only valid for the duration of a
    +	 * {@link SinkFunction#invoke(SinkContext, Object)} call. Do not store the context and use
    +	 * afterwards!
    +	 *
    +	 * @param <T> The type of elements accepted by the sink.
    +	 */
    +	@Public // Interface might be extended in the future with additional methods.
    +	interface SinkContext<T> {
    +
    +		/**
    +		 * Returns the timestamp of the current input record.
    +		 */
    +		long timestamp();
    +	}
    +
    --- End diff --
    
    I think our "code style" is "whatever is not rejected by our checkstyle", which is quite loose. 😉 But you're right, I'm removing the line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4616: [FLINK-7552] [FLINK-7553] Enhance SinkInterface / Use in ...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/4616
  
    It seems we completely have to ignore `SinkFunction`, because japicmp will check its serialiVersionUID, which changed by adding a new method to the interface. (And yes, I know that checking the serialVersionUID on an interface does not make a lot of sense.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4616: [FLINK-7552] [FLINK-7553] Enhance SinkInterface / ...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha closed the pull request at:

    https://github.com/apache/flink/pull/4616


---

[GitHub] flink issue #4616: [FLINK-7552] [FLINK-7553] Enhance SinkInterface / Use in ...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/4616
  
    +1 from my side to merge it like this (using boxed long, dropping the `hasTimestamp()` method.
    
    My reasoning is that this is consistent with `ProcessFunction` (like @EronWright said), it is also not in a hot loop and lazily created, so I do not expect a big performance hit.
    
    @aljoscha and me talked a lot about the whole design of when records have timestamps and whether in the future we should just assume that records always have timestamps. That biased my towards dropping the `hasTimestamp()` method.


---

[GitHub] flink pull request #4616: [FLINK-7552] [FLINK-7553] Enhance SinkInterface / ...

Posted by EronWright <gi...@git.apache.org>.
Github user EronWright commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4616#discussion_r140108752
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java ---
    @@ -31,10 +31,50 @@
     public interface SinkFunction<IN> extends Function, Serializable {
     
     	/**
    -	 * Function for standard sink behaviour. This function is called for every record.
    +	 * @deprecated Use {@link #invoke(Object, Context)}.
    +	 */
    +	@Deprecated
    +	default void invoke(IN value) throws Exception {}
    +
    +	/**
    +	 * Writes the given value to the sink. This function is called for every record.
    +	 *
    +	 * <p>You have to override this method when implementing a {@code SinkFunction}, this is a
    +	 * {@code default} method for backward compatibility with the old-style method only.
     	 *
     	 * @param value The input record.
    -	 * @throws Exception
    +	 * @param context Additional context about the input record.
    +	 *
    +	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
    +	 *                   to fail and may trigger recovery.
     	 */
    -	void invoke(IN value) throws Exception;
    +	default void invoke(IN value, Context context) throws Exception {
    +		invoke(value);
    +	}
    +
    +	/**
    +	 * Context that {@link SinkFunction SinkFunctions } can use for getting additional data about
    --- End diff --
    
    Is the link in this comment well-formed?


---

[GitHub] flink pull request #4616: [FLINK-7552] [FLINK-7553] Enhance SinkInterface / ...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4616#discussion_r136575860
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java ---
    @@ -311,7 +311,6 @@ public void invoke(IN next) throws Exception {
     		producer.send(record, callback);
     	}
     
    -	@Override
    --- End diff --
    
    That must have happened by accident, fixing...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4616: [FLINK-7552] [FLINK-7553] Enhance SinkInterface / ...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4616#discussion_r136717253
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java ---
    @@ -35,6 +35,39 @@
     	 *
     	 * @param value The input record.
     	 * @throws Exception
    +	 * @deprecated Use {@link #invoke(SinkContext, Object)}.
     	 */
    -	void invoke(IN value) throws Exception;
    +	@Deprecated
    +	default void invoke(IN value) throws Exception {
    +	}
    +
    +	/**
    +	 * Writes the given value to the sink. This function is called for every record.
    +	 *
    +	 * @param context Additional context about the input record.
    +	 * @param value The input record.
    +	 * @throws Exception
    +	 */
    +	default void invoke(SinkContext context, IN value) throws Exception {
    +		invoke(value);
    +	}
    +
    +	/**
    +	 * Context that {@link SinkFunction SinkFunctions } can use for getting additional data about
    +	 * an input record.
    +	 *
    +	 * <p>The context is only valid for the duration of a
    +	 * {@link SinkFunction#invoke(SinkContext, Object)} call. Do not store the context and use
    +	 * afterwards!
    +	 *
    +	 * @param <T> The type of elements accepted by the sink.
    +	 */
    +	@Public // Interface might be extended in the future with additional methods.
    +	interface SinkContext<T> {
    +
    +		/**
    +		 * Returns the timestamp of the current input record.
    +		 */
    +		long timestamp();
    --- End diff --
    
    I would be against unnecessary boxing whenever it is possible. If calling `timestamp()` doesn't make sense for processing time, then it would be better to just throw an exception.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4616: [FLINK-7552] [FLINK-7553] Enhance SinkInterface / ...

Posted by EronWright <gi...@git.apache.org>.
Github user EronWright commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4616#discussion_r137082133
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java ---
    @@ -35,6 +35,39 @@
     	 *
     	 * @param value The input record.
     	 * @throws Exception
    +	 * @deprecated Use {@link #invoke(SinkContext, Object)}.
     	 */
    -	void invoke(IN value) throws Exception;
    +	@Deprecated
    +	default void invoke(IN value) throws Exception {
    +	}
    +
    +	/**
    +	 * Writes the given value to the sink. This function is called for every record.
    +	 *
    +	 * @param context Additional context about the input record.
    +	 * @param value The input record.
    +	 * @throws Exception
    +	 */
    +	default void invoke(SinkContext context, IN value) throws Exception {
    +		invoke(value);
    +	}
    +
    +	/**
    +	 * Context that {@link SinkFunction SinkFunctions } can use for getting additional data about
    +	 * an input record.
    +	 *
    +	 * <p>The context is only valid for the duration of a
    +	 * {@link SinkFunction#invoke(SinkContext, Object)} call. Do not store the context and use
    +	 * afterwards!
    +	 *
    +	 * @param <T> The type of elements accepted by the sink.
    +	 */
    +	@Public // Interface might be extended in the future with additional methods.
    +	interface SinkContext<T> {
    +
    +		/**
    +		 * Returns the timestamp of the current input record.
    +		 */
    +		long timestamp();
    --- End diff --
    
    @pnowojski let us not throw an exception here, given the commented-out section of `StreamRecord::getTimestamp` which suggests a problem with that approach.   It would be good to know why `ProcessFunction` used a `Long` rather than `long`.   I have an almost unhealthy desire for consistency.


---

[GitHub] flink pull request #4616: [FLINK-7552] [FLINK-7553] Enhance SinkInterface / ...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4616#discussion_r136334346
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java ---
    @@ -31,14 +31,24 @@
     
     	private static final long serialVersionUID = 1L;
     
    +	private transient SimpleSinkContext sinkContext;
    +
     	public StreamSink(SinkFunction<IN> sinkFunction) {
     		super(sinkFunction);
     		chainingStrategy = ChainingStrategy.ALWAYS;
     	}
     
     	@Override
    +	public void open() throws Exception {
    +		super.open();
    +
    +		this.sinkContext = new SimpleSinkContext<>();
    +	}
    +
    +	@Override
     	public void processElement(StreamRecord<IN> element) throws Exception {
    -		userFunction.invoke(element.getValue());
    +		sinkContext.element = element;
    +		userFunction.invoke(sinkContext, element.getValue());
    --- End diff --
    
    I get it, however I still do not like this objects repacking thing done here. Instead of passing `StreamRecord` directly, we could introduce `UserRecord` interface with `getTimestamp()` method, that would be implemented by `StreamRecord`.
    
    But that's minor complain, if you have stronger preferences for keeping it as it is, it's fine for me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4616: [FLINK-7552] [FLINK-7553] Enhance SinkInterface / Use in ...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the issue:

    https://github.com/apache/flink/pull/4616
  
    I see. It is a similar problem we've discussed with the SourceContext. Users only implement it, but don't call it.
    
    I believe in this case, it makes sense to add an exception to Japicmp to allow this change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4616: [FLINK-7552] [FLINK-7553] Enhance SinkInterface / ...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4616#discussion_r136563378
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java ---
    @@ -311,7 +311,6 @@ public void invoke(IN next) throws Exception {
     		producer.send(record, callback);
     	}
     
    -	@Override
    --- End diff --
    
    Why did you remove `@Override`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4616: [FLINK-7552] [FLINK-7553] Enhance SinkInterface / Use in ...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/4616
  
    R: @rmetzger @pnowojski Could you please review this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4616: [FLINK-7552] [FLINK-7553] Enhance SinkInterface / ...

Posted by EronWright <gi...@git.apache.org>.
Github user EronWright commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4616#discussion_r136615982
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java ---
    @@ -35,6 +35,39 @@
     	 *
     	 * @param value The input record.
     	 * @throws Exception
    +	 * @deprecated Use {@link #invoke(SinkContext, Object)}.
     	 */
    -	void invoke(IN value) throws Exception;
    +	@Deprecated
    +	default void invoke(IN value) throws Exception {
    +	}
    +
    +	/**
    +	 * Writes the given value to the sink. This function is called for every record.
    +	 *
    +	 * @param context Additional context about the input record.
    +	 * @param value The input record.
    +	 * @throws Exception
    +	 */
    +	default void invoke(SinkContext context, IN value) throws Exception {
    +		invoke(value);
    +	}
    +
    +	/**
    +	 * Context that {@link SinkFunction SinkFunctions } can use for getting additional data about
    +	 * an input record.
    +	 *
    +	 * <p>The context is only valid for the duration of a
    +	 * {@link SinkFunction#invoke(SinkContext, Object)} call. Do not store the context and use
    +	 * afterwards!
    +	 *
    +	 * @param <T> The type of elements accepted by the sink.
    +	 */
    +	@Public // Interface might be extended in the future with additional methods.
    +	interface SinkContext<T> {
    --- End diff --
    
    Looking at `ProcessFunction`, the `Context` is an abstract class and has a simpler name.  For the sake of consistency, consider the same.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4616: [FLINK-7552] [FLINK-7553] Enhance SinkInterface / ...

Posted by EronWright <gi...@git.apache.org>.
Github user EronWright commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4616#discussion_r136615089
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java ---
    @@ -35,6 +35,39 @@
     	 *
     	 * @param value The input record.
     	 * @throws Exception
    +	 * @deprecated Use {@link #invoke(SinkContext, Object)}.
     	 */
    -	void invoke(IN value) throws Exception;
    +	@Deprecated
    +	default void invoke(IN value) throws Exception {
    +	}
    +
    +	/**
    +	 * Writes the given value to the sink. This function is called for every record.
    +	 *
    +	 * @param context Additional context about the input record.
    +	 * @param value The input record.
    +	 * @throws Exception
    +	 */
    +	default void invoke(SinkContext context, IN value) throws Exception {
    +		invoke(value);
    +	}
    +
    +	/**
    +	 * Context that {@link SinkFunction SinkFunctions } can use for getting additional data about
    +	 * an input record.
    +	 *
    +	 * <p>The context is only valid for the duration of a
    +	 * {@link SinkFunction#invoke(SinkContext, Object)} call. Do not store the context and use
    +	 * afterwards!
    +	 *
    +	 * @param <T> The type of elements accepted by the sink.
    +	 */
    +	@Public // Interface might be extended in the future with additional methods.
    +	interface SinkContext<T> {
    +
    +		/**
    +		 * Returns the timestamp of the current input record.
    +		 */
    +		long timestamp();
    --- End diff --
    
    Consider returning a `Long` to better match the `Context` interface in `ProcessFunction`, and document the behavior when the time characteristic is `ProcessingTime`.
    ```
    		/**
    		 * Timestamp of the element currently being processed or timestamp of a firing timer.
    		 *
    		 * <p>This might be {@code null}, for example if the time characteristic of your program
    		 * is set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.
    		 */
    		public abstract Long timestamp();
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4616: [FLINK-7552] [FLINK-7553] Enhance SinkInterface / ...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4616#discussion_r137039841
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java ---
    @@ -35,6 +35,39 @@
     	 *
     	 * @param value The input record.
     	 * @throws Exception
    +	 * @deprecated Use {@link #invoke(SinkContext, Object)}.
     	 */
    -	void invoke(IN value) throws Exception;
    +	@Deprecated
    +	default void invoke(IN value) throws Exception {
    +	}
    +
    +	/**
    +	 * Writes the given value to the sink. This function is called for every record.
    +	 *
    +	 * @param context Additional context about the input record.
    +	 * @param value The input record.
    +	 * @throws Exception
    +	 */
    +	default void invoke(SinkContext context, IN value) throws Exception {
    --- End diff --
    
    Makes sense, I will change this. 👍 


---

[GitHub] flink issue #4616: [FLINK-7552] [FLINK-7553] Enhance SinkInterface / Use in ...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/4616
  
    @pnowojski and @EronWright I changed the name of the `Context`, I added a test in `StreamSinkOperatorTest`, I added methods or querying current processing time/watermark to the context. I changed the `timestamp()` method to return a primitive `long` and I added a method `hasTimestamp()`. Also, `timestamp()` now throws an exception if no timestamp is available.
    
    What do you think?


---

[GitHub] flink issue #4616: [FLINK-7552] [FLINK-7553] Enhance SinkInterface / Use in ...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/4616
  
    To be honest, I didn't look at the tests because I didn't think they would fail. Sorry for that. 😓 I'll fix the tests and let you know once this is ready again.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4616: [FLINK-7552] [FLINK-7553] Enhance SinkInterface / ...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4616#discussion_r137205749
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java ---
    @@ -35,6 +35,39 @@
     	 *
     	 * @param value The input record.
     	 * @throws Exception
    +	 * @deprecated Use {@link #invoke(SinkContext, Object)}.
     	 */
    -	void invoke(IN value) throws Exception;
    +	@Deprecated
    +	default void invoke(IN value) throws Exception {
    +	}
    +
    +	/**
    +	 * Writes the given value to the sink. This function is called for every record.
    +	 *
    +	 * @param context Additional context about the input record.
    +	 * @param value The input record.
    +	 * @throws Exception
    +	 */
    +	default void invoke(SinkContext context, IN value) throws Exception {
    +		invoke(value);
    +	}
    +
    +	/**
    +	 * Context that {@link SinkFunction SinkFunctions } can use for getting additional data about
    +	 * an input record.
    +	 *
    +	 * <p>The context is only valid for the duration of a
    +	 * {@link SinkFunction#invoke(SinkContext, Object)} call. Do not store the context and use
    +	 * afterwards!
    +	 *
    +	 * @param <T> The type of elements accepted by the sink.
    +	 */
    +	@Public // Interface might be extended in the future with additional methods.
    +	interface SinkContext<T> {
    --- End diff --
    
    I'd like that as well but the approach won't work here because `SinkFunction` is an interface, i.e. I cannot have a `SinkFunction<T>.Context` because I can't use the generic parameter here.


---

[GitHub] flink issue #4616: [FLINK-7552] [FLINK-7553] Enhance SinkInterface / Use in ...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/4616
  
    The problem is that `RichSinkFunction` defines
    ```
    public abstract void invoke(IN value) throws Exception;
    ```
    
    even though it implements `SinkFunction`, which defines:
    ```
    void invoke(IN value) throws Exception;
    ```
    
    What this means is that code that is invoking these methods is different, depending on the type of reference. For example, in
    ```
    SinkFunction<String> interfaceSink = new SinkFunction<String>() {
        @Override
        public void invoke(String value) throws Exception {
    
        }
    };
    
    SinkFunction<String> interfaceSinkToRichSink = new RichSinkFunction<String>() {
        @Override
        public void invoke(String value) throws Exception {
    
        }
    };
    
    RichSinkFunction<String> richSink = new RichSinkFunction<String>() {
        @Override
        public void invoke(String value) throws Exception {
    
        }
    };
    
    interfaceSink.invoke("hello");
    interfaceSinkToRichSink.invoke("hello");
    richSink.invoke("hello");
    ```
    the first two calls are `invokeinterface` while the last call is using `invokevirtual`.
    
    I don't think that people are using the sink interfaces for directly calling methods on them but this is the reason the binary compatibility checker is complaining.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4616: [FLINK-7552] [FLINK-7553] Enhance SinkInterface / ...

Posted by EronWright <gi...@git.apache.org>.
Github user EronWright commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4616#discussion_r136614448
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java ---
    @@ -35,6 +35,39 @@
     	 *
     	 * @param value The input record.
     	 * @throws Exception
    +	 * @deprecated Use {@link #invoke(SinkContext, Object)}.
     	 */
    -	void invoke(IN value) throws Exception;
    +	@Deprecated
    +	default void invoke(IN value) throws Exception {
    +	}
    +
    +	/**
    +	 * Writes the given value to the sink. This function is called for every record.
    +	 *
    +	 * @param context Additional context about the input record.
    +	 * @param value The input record.
    +	 * @throws Exception
    +	 */
    +	default void invoke(SinkContext context, IN value) throws Exception {
    --- End diff --
    
    Consider re-ordering the two arguments to more closely match the similar signature in `ProcessFunction`.
    
    ```
    default void invoke(IN value, SinkContext ctx) throws Exception
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4616: [FLINK-7552] [FLINK-7553] Enhance SinkInterface / Use in ...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/4616
  
    @rmetzger the binary compatibility checker is acting up because it doesn't like the changes to `SinkFunction` and `RichSinkFunction`. It might be that we can't actually change this. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4616: [FLINK-7552] [FLINK-7553] Enhance SinkInterface / ...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4616#discussion_r136063488
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java ---
    @@ -31,14 +31,24 @@
     
     	private static final long serialVersionUID = 1L;
     
    +	private transient SimpleSinkContext sinkContext;
    +
     	public StreamSink(SinkFunction<IN> sinkFunction) {
     		super(sinkFunction);
     		chainingStrategy = ChainingStrategy.ALWAYS;
     	}
     
     	@Override
    +	public void open() throws Exception {
    +		super.open();
    +
    +		this.sinkContext = new SimpleSinkContext<>();
    +	}
    +
    +	@Override
     	public void processElement(StreamRecord<IN> element) throws Exception {
    -		userFunction.invoke(element.getValue());
    +		sinkContext.element = element;
    +		userFunction.invoke(sinkContext, element.getValue());
    --- End diff --
    
    wouldn't it be better/simpler to just pass `StreamRecord` to the `userFunction`?
    
    `userFunction.invoke(element)`?
    
    and instead of adding `SinkContext` as a first argument of the `invoke` method in the sink interface, just change the element type from `IN` to `StreamRecord<IN>`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4616: [FLINK-7552] [FLINK-7553] Enhance SinkInterface / Use in ...

Posted by EronWright <gi...@git.apache.org>.
Github user EronWright commented on the issue:

    https://github.com/apache/flink/pull/4616
  
    +1


---

[GitHub] flink issue #4616: [FLINK-7552] [FLINK-7553] Enhance SinkInterface / Use in ...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the issue:

    https://github.com/apache/flink/pull/4616
  
    No, you can define fine-grained exclusions. Check the excludes in the example: http://siom79.github.io/japicmp/MavenPlugin.html


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4616: [FLINK-7552] [FLINK-7553] Enhance SinkInterface / Use in ...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/4616
  
    Thanks for the reviews, everyone! 😃 


---

[GitHub] flink pull request #4616: [FLINK-7552] [FLINK-7553] Enhance SinkInterface / ...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4616#discussion_r137206502
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java ---
    @@ -35,6 +35,39 @@
     	 *
     	 * @param value The input record.
     	 * @throws Exception
    +	 * @deprecated Use {@link #invoke(SinkContext, Object)}.
     	 */
    -	void invoke(IN value) throws Exception;
    +	@Deprecated
    +	default void invoke(IN value) throws Exception {
    +	}
    +
    +	/**
    +	 * Writes the given value to the sink. This function is called for every record.
    +	 *
    +	 * @param context Additional context about the input record.
    +	 * @param value The input record.
    +	 * @throws Exception
    +	 */
    +	default void invoke(SinkContext context, IN value) throws Exception {
    +		invoke(value);
    +	}
    +
    +	/**
    +	 * Context that {@link SinkFunction SinkFunctions } can use for getting additional data about
    +	 * an input record.
    +	 *
    +	 * <p>The context is only valid for the duration of a
    +	 * {@link SinkFunction#invoke(SinkContext, Object)} call. Do not store the context and use
    +	 * afterwards!
    +	 *
    +	 * @param <T> The type of elements accepted by the sink.
    +	 */
    +	@Public // Interface might be extended in the future with additional methods.
    +	interface SinkContext<T> {
    +
    +		/**
    +		 * Returns the timestamp of the current input record.
    +		 */
    +		long timestamp();
    --- End diff --
    
    @StephanEwen What do you think about this? You introduced the code in `StreamRecord` that returns `Long.MIN_VALUE` when there is no timestamp instead of throwing an exception.


---

[GitHub] flink pull request #4616: [FLINK-7552] [FLINK-7553] Enhance SinkInterface / ...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4616#discussion_r136279538
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java ---
    @@ -31,14 +31,24 @@
     
     	private static final long serialVersionUID = 1L;
     
    +	private transient SimpleSinkContext sinkContext;
    +
     	public StreamSink(SinkFunction<IN> sinkFunction) {
     		super(sinkFunction);
     		chainingStrategy = ChainingStrategy.ALWAYS;
     	}
     
     	@Override
    +	public void open() throws Exception {
    +		super.open();
    +
    +		this.sinkContext = new SimpleSinkContext<>();
    +	}
    +
    +	@Override
     	public void processElement(StreamRecord<IN> element) throws Exception {
    -		userFunction.invoke(element.getValue());
    +		sinkContext.element = element;
    +		userFunction.invoke(sinkContext, element.getValue());
    --- End diff --
    
    `StreamRecord` is an internal concept that should not be exposed in any user facing API. That's the reason for the extract context. Plus, the context allows us to extend the information that we pass to the `SinkFunction` in the future, similar to the context in `ProcessFunction` and `ProcessWindowFunction`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---