You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/09/21 11:47:01 UTC
[2/2] flink git commit: [FLINK-7553] Use new SinkFunction interface
in FlinkKafkaProducer010
[FLINK-7553] Use new SinkFunction interface in FlinkKafkaProducer010
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6886f638
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6886f638
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6886f638
Branch: refs/heads/master
Commit: 6886f638d70419e01ebdfc1cdbb6d834f3fb30b0
Parents: e7996b0
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Aug 29 15:53:16 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Sep 21 13:46:39 2017 +0200
----------------------------------------------------------------------
.../connectors/kafka/FlinkKafkaProducer010.java | 226 +++++--------------
.../kafka/FlinkKafkaProducerBase.java | 2 +-
.../connectors/kafka/KafkaProducerTestBase.java | 3 +
.../api/functions/sink/SinkContextUtil.java | 7 +-
.../api/functions/sink/SinkFunction.java | 26 +--
.../streaming/api/operators/StreamSink.java | 16 +-
.../api/operators/StreamSinkOperatorTest.java | 3 +-
7 files changed, 81 insertions(+), 202 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6886f638/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
index 3b9dff1..3b43a7e 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
@@ -17,24 +17,13 @@
package org.apache.flink.streaming.connectors.kafka;
-import org.apache.flink.api.common.functions.IterationRuntimeContext;
-import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
@@ -43,32 +32,10 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
-import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getPartitionsByTopic;
-import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getPropertiesFromBrokerList;
-
/**
* Flink Sink to produce data into a Kafka topic. This producer is compatible with Kafka 0.10.x
- *
- * <p>Implementation note: This producer is a hybrid between a regular regular sink function (a)
- * and a custom operator (b).
- *
- * <p>For (a), the class implements the SinkFunction and RichFunction interfaces.
- * For (b), it extends the StreamTask class.
- *
- * <p>Details about approach (a):
- * Pre Kafka 0.10 producers only follow approach (a), allowing users to use the producer using the
- * DataStream.addSink() method.
- * Since the APIs exposed in that variant do not allow accessing the the timestamp attached to the record
- * the Kafka 0.10 producer has a second invocation option, approach (b).
- *
- * <p>Details about approach (b):
- * Kafka 0.10 supports writing the timestamp attached to a record to Kafka. When adding the
- * FlinkKafkaProducer010 using the FlinkKafkaProducer010.writeToKafkaWithTimestamps() method, the Kafka producer
- * can access the internal record timestamp of the record and write it to Kafka.
- *
- * <p>All methods and constructors in this class are marked with the approach they are needed for.
*/
-public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunction<T>, RichFunction, CheckpointedFunction {
+public class FlinkKafkaProducer010<T> extends FlinkKafkaProducer09<T> {
/**
* Flag controlling whether we are writing the Flink record's timestamp into Kafka.
@@ -87,7 +54,11 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
* @param topicId ID of the Kafka topic.
* @param serializationSchema User defined serialization schema supporting key/value messages
* @param producerConfig Properties with the producer configuration.
+ *
+ * @deprecated Use {@link #FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties)}
+ * and call {@link #setWriteTimestampToKafka(boolean)}.
*/
+ @Deprecated
public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
String topicId,
KeyedSerializationSchema<T> serializationSchema,
@@ -105,7 +76,11 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
* @param topicId ID of the Kafka topic.
* @param serializationSchema User defined (keyless) serialization schema.
* @param producerConfig Properties with the producer configuration.
+ *
+ * @deprecated Use {@link #FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties)}
+ * and call {@link #setWriteTimestampToKafka(boolean)}.
*/
+ @Deprecated
public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
String topicId,
SerializationSchema<T> serializationSchema,
@@ -124,20 +99,24 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
* @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
* @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
* @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
+ *
+ * @deprecated Use {@link #FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)}
+ * and call {@link #setWriteTimestampToKafka(boolean)}.
*/
+ @Deprecated
public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
String topicId,
KeyedSerializationSchema<T> serializationSchema,
Properties producerConfig,
FlinkKafkaPartitioner<T> customPartitioner) {
- GenericTypeInfo<Object> objectTypeInfo = new GenericTypeInfo<>(Object.class);
FlinkKafkaProducer010<T> kafkaProducer = new FlinkKafkaProducer010<>(topicId, serializationSchema, producerConfig, customPartitioner);
- SingleOutputStreamOperator<Object> transformation = inStream.transform("FlinKafkaProducer 0.10.x", objectTypeInfo, kafkaProducer);
- return new FlinkKafkaProducer010Configuration<>(transformation, kafkaProducer);
+ DataStreamSink<T> streamSink = inStream.addSink(kafkaProducer);
+ return new FlinkKafkaProducer010Configuration<>(streamSink, inStream, kafkaProducer);
+
}
- // ---------------------- Regular constructors w/o timestamp support ------------------
+ // ---------------------- Regular constructors------------------
/**
* Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
@@ -220,9 +199,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
* <p>This constructor does not allow writing timestamps to Kafka, it follow approach (a) (see above)
*/
public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<T> customPartitioner) {
- // We create a Kafka 09 producer instance here and only "override" (by intercepting) the
- // invoke call.
- super(new FlinkKafkaProducer09<>(topicId, serializationSchema, producerConfig, customPartitioner));
+ super(topicId, serializationSchema, producerConfig, customPartitioner);
}
// ----------------------------- Deprecated constructors / factory methods ---------------------------
@@ -250,11 +227,10 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
Properties producerConfig,
KafkaPartitioner<T> customPartitioner) {
- GenericTypeInfo<Object> objectTypeInfo = new GenericTypeInfo<>(Object.class);
FlinkKafkaProducer010<T> kafkaProducer =
new FlinkKafkaProducer010<>(topicId, serializationSchema, producerConfig, new FlinkKafkaDelegatePartitioner<>(customPartitioner));
- SingleOutputStreamOperator<Object> transformation = inStream.transform("FlinKafkaProducer 0.10.x", objectTypeInfo, kafkaProducer);
- return new FlinkKafkaProducer010Configuration<>(transformation, kafkaProducer);
+ DataStreamSink<T> streamSink = inStream.addSink(kafkaProducer);
+ return new FlinkKafkaProducer010Configuration<T>(streamSink, inStream, kafkaProducer);
}
/**
@@ -288,157 +264,75 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, KafkaPartitioner<T> customPartitioner) {
// We create a Kafka 09 producer instance here and only "override" (by intercepting) the
// invoke call.
- super(new FlinkKafkaProducer09<>(topicId, serializationSchema, producerConfig, new FlinkKafkaDelegatePartitioner<>(customPartitioner)));
+ super(topicId, serializationSchema, producerConfig, customPartitioner);
}
- // ----------------------------- Generic element processing ---------------------------
+ /**
+ * If set to true, Flink will write the (event time) timestamp attached to each record into Kafka.
+ * Timestamps must be positive for Kafka to accept them.
+ *
+ * @param writeTimestampToKafka Flag indicating if Flink's internal timestamps are written to Kafka.
+ */
+ public void setWriteTimestampToKafka(boolean writeTimestampToKafka) {
+ this.writeTimestampToKafka = writeTimestampToKafka;
+ }
- private void invokeInternal(T next, long elementTimestamp) throws Exception {
- final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction;
+ // ----------------------------- Generic element processing ---------------------------
- internalProducer.checkErroneous();
+ @Override
+ public void invoke(T value, Context context) throws Exception {
- byte[] serializedKey = internalProducer.schema.serializeKey(next);
- byte[] serializedValue = internalProducer.schema.serializeValue(next);
- String targetTopic = internalProducer.schema.getTargetTopic(next);
+ checkErroneous();
+
+ byte[] serializedKey = schema.serializeKey(value);
+ byte[] serializedValue = schema.serializeValue(value);
+ String targetTopic = schema.getTargetTopic(value);
if (targetTopic == null) {
- targetTopic = internalProducer.defaultTopicId;
+ targetTopic = defaultTopicId;
}
Long timestamp = null;
if (this.writeTimestampToKafka) {
- timestamp = elementTimestamp;
+ timestamp = context.timestamp();
}
ProducerRecord<byte[], byte[]> record;
- int[] partitions = internalProducer.topicPartitionsMap.get(targetTopic);
+ int[] partitions = topicPartitionsMap.get(targetTopic);
if (null == partitions) {
- partitions = getPartitionsByTopic(targetTopic, internalProducer.producer);
- internalProducer.topicPartitionsMap.put(targetTopic, partitions);
+ partitions = getPartitionsByTopic(targetTopic, producer);
+ topicPartitionsMap.put(targetTopic, partitions);
}
- if (internalProducer.flinkKafkaPartitioner == null) {
+ if (flinkKafkaPartitioner == null) {
record = new ProducerRecord<>(targetTopic, null, timestamp, serializedKey, serializedValue);
} else {
- record = new ProducerRecord<>(targetTopic, internalProducer.flinkKafkaPartitioner.partition(next, serializedKey, serializedValue, targetTopic, partitions), timestamp, serializedKey, serializedValue);
+ record = new ProducerRecord<>(targetTopic, flinkKafkaPartitioner.partition(value, serializedKey, serializedValue, targetTopic, partitions), timestamp, serializedKey, serializedValue);
}
- if (internalProducer.flushOnCheckpoint) {
- synchronized (internalProducer.pendingRecordsLock) {
- internalProducer.pendingRecords++;
+ if (flushOnCheckpoint) {
+ synchronized (pendingRecordsLock) {
+ pendingRecords++;
}
}
- internalProducer.producer.send(record, internalProducer.callback);
- }
-
- // ----------------- Helper methods implementing methods from SinkFunction and RichFunction (Approach (a)) ----
-
- // ---- Configuration setters
-
- /**
- * Defines whether the producer should fail on errors, or only log them.
- * If this is set to true, then exceptions will be only logged, if set to false,
- * exceptions will be eventually thrown and cause the streaming program to
- * fail (and enter recovery).
- *
- * <p>Method is only accessible for approach (a) (see above)
- *
- * @param logFailuresOnly The flag to indicate logging-only on exceptions.
- */
- public void setLogFailuresOnly(boolean logFailuresOnly) {
- final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction;
- internalProducer.setLogFailuresOnly(logFailuresOnly);
- }
-
- /**
- * If set to true, the Flink producer will wait for all outstanding messages in the Kafka buffers
- * to be acknowledged by the Kafka producer on a checkpoint.
- * This way, the producer can guarantee that messages in the Kafka buffers are part of the checkpoint.
- *
- * <p>Method is only accessible for approach (a) (see above)
- *
- * @param flush Flag indicating the flushing mode (true = flush on checkpoint)
- */
- public void setFlushOnCheckpoint(boolean flush) {
- final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction;
- internalProducer.setFlushOnCheckpoint(flush);
- }
-
- /**
- * This method is used for approach (a) (see above).
- */
- @Override
- public void open(Configuration parameters) throws Exception {
- final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction;
- internalProducer.open(parameters);
- }
-
- /**
- * This method is used for approach (a) (see above).
- */
- @Override
- public IterationRuntimeContext getIterationRuntimeContext() {
- final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction;
- return internalProducer.getIterationRuntimeContext();
- }
-
- /**
- * This method is used for approach (a) (see above).
- */
- @Override
- public void setRuntimeContext(RuntimeContext t) {
- final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction;
- internalProducer.setRuntimeContext(t);
- }
-
- /**
- * Invoke method for using the Sink as DataStream.addSink() sink.
- *
- * <p>This method is used for approach (a) (see above)
- *
- * @param value The input record.
- */
- @Override
- public void invoke(T value) throws Exception {
- invokeInternal(value, Long.MAX_VALUE);
- }
-
- // ----------------- Helper methods and classes implementing methods from StreamSink (Approach (b)) ----
-
- /**
- * Process method for using the sink with timestamp support.
- *
- * <p>This method is used for approach (b) (see above)
- */
- @Override
- public void processElement(StreamRecord<T> element) throws Exception {
- invokeInternal(element.getValue(), element.getTimestamp());
- }
-
- @Override
- public void initializeState(FunctionInitializationContext context) throws Exception {
- final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction;
- internalProducer.initializeState(context);
- }
-
- @Override
- public void snapshotState(FunctionSnapshotContext context) throws Exception {
- final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction;
- internalProducer.snapshotState(context);
+ producer.send(record, callback);
}
/**
* Configuration object returned by the writeToKafkaWithTimestamps() call.
+ *
+ * <p>This is only kept because it's part of the public API. It is not necessary anymore, now
+ * that the {@link SinkFunction} interface provides timestamps.
*/
public static class FlinkKafkaProducer010Configuration<T> extends DataStreamSink<T> {
- private final FlinkKafkaProducerBase wrappedProducerBase;
private final FlinkKafkaProducer010 producer;
- private FlinkKafkaProducer010Configuration(DataStream stream, FlinkKafkaProducer010<T> producer) {
+ private FlinkKafkaProducer010Configuration(
+ DataStreamSink originalSink,
+ DataStream<T> inputStream,
+ FlinkKafkaProducer010<T> producer) {
//noinspection unchecked
- super(stream, producer);
+ super(inputStream, originalSink.getTransformation().getOperator());
this.producer = producer;
- this.wrappedProducerBase = (FlinkKafkaProducerBase) producer.userFunction;
}
/**
@@ -450,7 +344,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
* @param logFailuresOnly The flag to indicate logging-only on exceptions.
*/
public void setLogFailuresOnly(boolean logFailuresOnly) {
- this.wrappedProducerBase.setLogFailuresOnly(logFailuresOnly);
+ producer.setLogFailuresOnly(logFailuresOnly);
}
/**
@@ -461,7 +355,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
* @param flush Flag indicating the flushing mode (true = flush on checkpoint)
*/
public void setFlushOnCheckpoint(boolean flush) {
- this.wrappedProducerBase.setFlushOnCheckpoint(flush);
+ producer.setFlushOnCheckpoint(flush);
}
/**
@@ -471,7 +365,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
* @param writeTimestampToKafka Flag indicating if Flink's internal timestamps are written to Kafka.
*/
public void setWriteTimestampToKafka(boolean writeTimestampToKafka) {
- this.producer.writeTimestampToKafka = writeTimestampToKafka;
+ producer.writeTimestampToKafka = writeTimestampToKafka;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6886f638/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
index 76a2f84..befc1a1 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
@@ -276,7 +276,7 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
* The incoming data
*/
@Override
- public void invoke(IN next) throws Exception {
+ public void invoke(IN next, Context context) throws Exception {
// propagate asynchronous errors
checkErroneous();
http://git-wip-us.apache.org/repos/asf/flink/blob/6886f638/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
index 000de52..35607dd 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
@@ -215,6 +215,9 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
* This test sets KafkaProducer so that it will not automatically flush the data and
* simulate network failure between Flink and Kafka to check whether FlinkKafkaProducer
* flushed records manually on snapshotState.
+ *
+ * <p>Due to legacy reasons there are two different ways of instantiating a Kafka 0.10 sink. The
+ * parameter controls which method is used.
*/
protected void testOneToOneAtLeastOnce(boolean regularSink) throws Exception {
final String topic = regularSink ? "oneToOneTopicRegularSink" : "oneToOneTopicCustomOperator";
http://git-wip-us.apache.org/repos/asf/flink/blob/6886f638/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkContextUtil.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkContextUtil.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkContextUtil.java
index 2749560..3b02ad0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkContextUtil.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkContextUtil.java
@@ -43,14 +43,9 @@ public class SinkContextUtil {
}
@Override
- public long timestamp() {
+ public Long timestamp() {
return timestamp;
}
-
- @Override
- public boolean hasTimestamp() {
- return true;
- }
};
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6886f638/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java
index 15a77c4..74870bc 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java
@@ -31,22 +31,22 @@ import java.io.Serializable;
public interface SinkFunction<IN> extends Function, Serializable {
/**
- * Function for standard sink behaviour. This function is called for every record.
- *
- * @param value The input record.
- * @throws Exception
* @deprecated Use {@link #invoke(Object, Context)}.
*/
@Deprecated
- default void invoke(IN value) throws Exception {
- }
+ default void invoke(IN value) throws Exception {}
/**
* Writes the given value to the sink. This function is called for every record.
*
+ * <p>You have to override this method when implementing a {@code SinkFunction}, this is a
+ * {@code default} method for backward compatibility with the old-style method only.
+ *
* @param value The input record.
* @param context Additional context about the input record.
- * @throws Exception
+ *
+ * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
+ * to fail and may trigger recovery.
*/
default void invoke(IN value, Context context) throws Exception {
invoke(value);
@@ -72,15 +72,9 @@ public interface SinkFunction<IN> extends Function, Serializable {
long currentWatermark();
/**
- * Returns the timestamp of the current input record.
- */
- long timestamp();
-
- /**
- * Checks whether this record has a timestamp.
- *
- * @return True if the record has a timestamp, false if not.
+ * Returns the timestamp of the current input record or {@code null} if the element does not
+ * have an assigned timestamp.
*/
- boolean hasTimestamp();
+ Long timestamp();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6886f638/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
index f4b09af..667e130 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
@@ -91,19 +91,11 @@ public class StreamSink<IN> extends AbstractUdfStreamOperator<Object, SinkFuncti
}
@Override
- public long timestamp() {
- if (!element.hasTimestamp()) {
- throw new IllegalStateException(
- "Record has no timestamp. Is the time characteristic set to 'ProcessingTime', or " +
- "did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?");
-
+ public Long timestamp() {
+ if (element.hasTimestamp()) {
+ return element.getTimestamp();
}
- return element.getTimestamp();
- }
-
- public boolean hasTimestamp() {
- return element.hasTimestamp();
+ return null;
}
-
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6886f638/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamSinkOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamSinkOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamSinkOperatorTest.java
index 500a52a..9085ade 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamSinkOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamSinkOperatorTest.java
@@ -96,7 +96,8 @@ public class StreamSinkOperatorTest extends TestLogger {
@Override
public void invoke(
T value, Context context) throws Exception {
- if (context.hasTimestamp()) {
+ Long timestamp = context.timestamp();
+ if (timestamp != null) {
data.add(
new Tuple4<>(
context.currentWatermark(),