You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Aljoscha Krettek (JIRA)" <ji...@apache.org> on 2017/08/29 13:47:00 UTC
[jira] [Created] (FLINK-7552) Extend SinkFunction interface with
SinkContext
Aljoscha Krettek created FLINK-7552:
---------------------------------------
Summary: Extend SinkFunction interface with SinkContext
Key: FLINK-7552
URL: https://issues.apache.org/jira/browse/FLINK-7552
Project: Flink
Issue Type: Bug
Components: DataStream API
Reporter: Aljoscha Krettek
Assignee: Aljoscha Krettek
Fix For: 1.4.0
Now that we require Java 8 we can extend the {{SinkFunction}} interface without breaking backwards compatibility. I'm proposing this:
{code}
/**
* Interface for implementing user defined sink functionality.
*
* @param <IN> Input type parameter.
*/
@Public
public interface SinkFunction<IN> extends Function, Serializable {
/**
* Function for standard sink behaviour. This function is called for every record.
*
* @param value The input record.
* @throws Exception
* @deprecated Use {@link #invoke(SinkContext, Object)}.
*/
@Deprecated
default void invoke(IN value) throws Exception {
}
/**
* Writes the given value to the sink. This function is called for every record.
*
* @param context Additional context about the input record.
* @param value The input record.
* @throws Exception
*/
default void invoke(SinkContext context, IN value) throws Exception {
invoke(value);
}
/**
* Context that {@link SinkFunction SinkFunctions } can use for getting additional data about
* an input record.
*
* @param <T> The type of elements accepted by the sink.
*/
@Public // Interface might be extended in the future with additional methods.
interface SinkContext<T> {
/**
* Returns the timestamp of the current input record.
*/
long timestamp();
}
}
{code}
For now, this only allows access to the element timestamp. This would allow us to fix the abomination that is {{FlinkKafkaProducer010}}, which is a hybrid {{SinkFunction}}/{{StreamOperator}} only because it needs access to timestamps.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)