You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/09/21 11:47:01 UTC

[2/2] flink git commit: [FLINK-7553] Use new SinkFunction interface in FlinkKafkaProducer010

[FLINK-7553] Use new SinkFunction interface in FlinkKafkaProducer010


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6886f638
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6886f638
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6886f638

Branch: refs/heads/master
Commit: 6886f638d70419e01ebdfc1cdbb6d834f3fb30b0
Parents: e7996b0
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Aug 29 15:53:16 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Sep 21 13:46:39 2017 +0200

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaProducer010.java | 226 +++++--------------
 .../kafka/FlinkKafkaProducerBase.java           |   2 +-
 .../connectors/kafka/KafkaProducerTestBase.java |   3 +
 .../api/functions/sink/SinkContextUtil.java     |   7 +-
 .../api/functions/sink/SinkFunction.java        |  26 +--
 .../streaming/api/operators/StreamSink.java     |  16 +-
 .../api/operators/StreamSinkOperatorTest.java   |   3 +-
 7 files changed, 81 insertions(+), 202 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6886f638/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
index 3b9dff1..3b43a7e 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
@@ -17,24 +17,13 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.api.common.functions.IterationRuntimeContext;
-import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
@@ -43,32 +32,10 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 
 import java.util.Properties;
 
-import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getPartitionsByTopic;
-import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getPropertiesFromBrokerList;
-
 /**
  * Flink Sink to produce data into a Kafka topic. This producer is compatible with Kafka 0.10.x
- *
- * <p>Implementation note: This producer is a hybrid between a regular regular sink function (a)
- * and a custom operator (b).
- *
- * <p>For (a), the class implements the SinkFunction and RichFunction interfaces.
- * For (b), it extends the StreamTask class.
- *
- * <p>Details about approach (a):
- *  Pre Kafka 0.10 producers only follow approach (a), allowing users to use the producer using the
- *  DataStream.addSink() method.
- *  Since the APIs exposed in that variant do not allow accessing the the timestamp attached to the record
- *  the Kafka 0.10 producer has a second invocation option, approach (b).
- *
- * <p>Details about approach (b):
- *  Kafka 0.10 supports writing the timestamp attached to a record to Kafka. When adding the
- *  FlinkKafkaProducer010 using the FlinkKafkaProducer010.writeToKafkaWithTimestamps() method, the Kafka producer
- *  can access the internal record timestamp of the record and write it to Kafka.
- *
- * <p>All methods and constructors in this class are marked with the approach they are needed for.
  */
-public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunction<T>, RichFunction, CheckpointedFunction {
+public class FlinkKafkaProducer010<T> extends FlinkKafkaProducer09<T> {
 
 	/**
 	 * Flag controlling whether we are writing the Flink record's timestamp into Kafka.
@@ -87,7 +54,11 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
 	 * @param topicId ID of the Kafka topic.
 	 * @param serializationSchema User defined serialization schema supporting key/value messages
 	 * @param producerConfig Properties with the producer configuration.
+	 *
+	 * @deprecated Use {@link #FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties)}
+	 * and call {@link #setWriteTimestampToKafka(boolean)}.
 	 */
+	@Deprecated
 	public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
 																					String topicId,
 																					KeyedSerializationSchema<T> serializationSchema,
@@ -105,7 +76,11 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
 	 * @param topicId ID of the Kafka topic.
 	 * @param serializationSchema User defined (keyless) serialization schema.
 	 * @param producerConfig Properties with the producer configuration.
+	 *
+	 * @deprecated Use {@link #FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties)}
+	 * and call {@link #setWriteTimestampToKafka(boolean)}.
 	 */
+	@Deprecated
 	public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
 																					String topicId,
 																					SerializationSchema<T> serializationSchema,
@@ -124,20 +99,24 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
 	 *  @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
 	 *  @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
 	 *  @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
+	 *
+	 * @deprecated Use {@link #FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)}
+	 * and call {@link #setWriteTimestampToKafka(boolean)}.
 	 */
+	@Deprecated
 	public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
 																					String topicId,
 																					KeyedSerializationSchema<T> serializationSchema,
 																					Properties producerConfig,
 																					FlinkKafkaPartitioner<T> customPartitioner) {
 
-		GenericTypeInfo<Object> objectTypeInfo = new GenericTypeInfo<>(Object.class);
 		FlinkKafkaProducer010<T> kafkaProducer = new FlinkKafkaProducer010<>(topicId, serializationSchema, producerConfig, customPartitioner);
-		SingleOutputStreamOperator<Object> transformation = inStream.transform("FlinKafkaProducer 0.10.x", objectTypeInfo, kafkaProducer);
-		return new FlinkKafkaProducer010Configuration<>(transformation, kafkaProducer);
+		DataStreamSink<T> streamSink = inStream.addSink(kafkaProducer);
+		return new FlinkKafkaProducer010Configuration<>(streamSink, inStream, kafkaProducer);
+
 	}
 
-	// ---------------------- Regular constructors w/o timestamp support  ------------------
+	// ---------------------- Regular constructors------------------
 
 	/**
 	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
@@ -220,9 +199,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
 	 * <p>This constructor does not allow writing timestamps to Kafka, it follow approach (a) (see above)
 	 */
 	public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<T> customPartitioner) {
-		// We create a Kafka 09 producer instance here and only "override" (by intercepting) the
-		// invoke call.
-		super(new FlinkKafkaProducer09<>(topicId, serializationSchema, producerConfig, customPartitioner));
+		super(topicId, serializationSchema, producerConfig, customPartitioner);
 	}
 
 	// ----------------------------- Deprecated constructors / factory methods  ---------------------------
@@ -250,11 +227,10 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
 																					Properties producerConfig,
 																					KafkaPartitioner<T> customPartitioner) {
 
-		GenericTypeInfo<Object> objectTypeInfo = new GenericTypeInfo<>(Object.class);
 		FlinkKafkaProducer010<T> kafkaProducer =
 				new FlinkKafkaProducer010<>(topicId, serializationSchema, producerConfig, new FlinkKafkaDelegatePartitioner<>(customPartitioner));
-		SingleOutputStreamOperator<Object> transformation = inStream.transform("FlinKafkaProducer 0.10.x", objectTypeInfo, kafkaProducer);
-		return new FlinkKafkaProducer010Configuration<>(transformation, kafkaProducer);
+		DataStreamSink<T> streamSink = inStream.addSink(kafkaProducer);
+		return new FlinkKafkaProducer010Configuration<T>(streamSink, inStream, kafkaProducer);
 	}
 
 	/**
@@ -288,157 +264,75 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
 	public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, KafkaPartitioner<T> customPartitioner) {
 		// We create a Kafka 09 producer instance here and only "override" (by intercepting) the
 		// invoke call.
-		super(new FlinkKafkaProducer09<>(topicId, serializationSchema, producerConfig, new FlinkKafkaDelegatePartitioner<>(customPartitioner)));
+		super(topicId, serializationSchema, producerConfig, customPartitioner);
 	}
 
-	// ----------------------------- Generic element processing  ---------------------------
+	/**
+	 * If set to true, Flink will write the (event time) timestamp attached to each record into Kafka.
+	 * Timestamps must be positive for Kafka to accept them.
+	 *
+	 * @param writeTimestampToKafka Flag indicating if Flink's internal timestamps are written to Kafka.
+	 */
+	public void setWriteTimestampToKafka(boolean writeTimestampToKafka) {
+		this.writeTimestampToKafka = writeTimestampToKafka;
+	}
 
-	private void invokeInternal(T next, long elementTimestamp) throws Exception {
 
-		final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction;
+	// ----------------------------- Generic element processing  ---------------------------
 
-		internalProducer.checkErroneous();
+	@Override
+	public void invoke(T value, Context context) throws Exception {
 
-		byte[] serializedKey = internalProducer.schema.serializeKey(next);
-		byte[] serializedValue = internalProducer.schema.serializeValue(next);
-		String targetTopic = internalProducer.schema.getTargetTopic(next);
+		checkErroneous();
+
+		byte[] serializedKey = schema.serializeKey(value);
+		byte[] serializedValue = schema.serializeValue(value);
+		String targetTopic = schema.getTargetTopic(value);
 		if (targetTopic == null) {
-			targetTopic = internalProducer.defaultTopicId;
+			targetTopic = defaultTopicId;
 		}
 
 		Long timestamp = null;
 		if (this.writeTimestampToKafka) {
-			timestamp = elementTimestamp;
+			timestamp = context.timestamp();
 		}
 
 		ProducerRecord<byte[], byte[]> record;
-		int[] partitions = internalProducer.topicPartitionsMap.get(targetTopic);
+		int[] partitions = topicPartitionsMap.get(targetTopic);
 		if (null == partitions) {
-			partitions = getPartitionsByTopic(targetTopic, internalProducer.producer);
-			internalProducer.topicPartitionsMap.put(targetTopic, partitions);
+			partitions = getPartitionsByTopic(targetTopic, producer);
+			topicPartitionsMap.put(targetTopic, partitions);
 		}
-		if (internalProducer.flinkKafkaPartitioner == null) {
+		if (flinkKafkaPartitioner == null) {
 			record = new ProducerRecord<>(targetTopic, null, timestamp, serializedKey, serializedValue);
 		} else {
-			record = new ProducerRecord<>(targetTopic, internalProducer.flinkKafkaPartitioner.partition(next, serializedKey, serializedValue, targetTopic, partitions), timestamp, serializedKey, serializedValue);
+			record = new ProducerRecord<>(targetTopic, flinkKafkaPartitioner.partition(value, serializedKey, serializedValue, targetTopic, partitions), timestamp, serializedKey, serializedValue);
 		}
-		if (internalProducer.flushOnCheckpoint) {
-			synchronized (internalProducer.pendingRecordsLock) {
-				internalProducer.pendingRecords++;
+		if (flushOnCheckpoint) {
+			synchronized (pendingRecordsLock) {
+				pendingRecords++;
 			}
 		}
-		internalProducer.producer.send(record, internalProducer.callback);
-	}
-
-	// ----------------- Helper methods implementing methods from SinkFunction and RichFunction (Approach (a)) ----
-
-	// ---- Configuration setters
-
-	/**
-	 * Defines whether the producer should fail on errors, or only log them.
-	 * If this is set to true, then exceptions will be only logged, if set to false,
-	 * exceptions will be eventually thrown and cause the streaming program to
-	 * fail (and enter recovery).
-	 *
-	 * <p>Method is only accessible for approach (a) (see above)
-	 *
-	 * @param logFailuresOnly The flag to indicate logging-only on exceptions.
-	 */
-	public void setLogFailuresOnly(boolean logFailuresOnly) {
-		final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction;
-		internalProducer.setLogFailuresOnly(logFailuresOnly);
-	}
-
-	/**
-	 * If set to true, the Flink producer will wait for all outstanding messages in the Kafka buffers
-	 * to be acknowledged by the Kafka producer on a checkpoint.
-	 * This way, the producer can guarantee that messages in the Kafka buffers are part of the checkpoint.
-	 *
-	 * <p>Method is only accessible for approach (a) (see above)
-	 *
-	 * @param flush Flag indicating the flushing mode (true = flush on checkpoint)
-	 */
-	public void setFlushOnCheckpoint(boolean flush) {
-		final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction;
-		internalProducer.setFlushOnCheckpoint(flush);
-	}
-
-	/**
-	 * This method is used for approach (a) (see above).
-	 */
-	@Override
-	public void open(Configuration parameters) throws Exception {
-		final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction;
-		internalProducer.open(parameters);
-	}
-
-	/**
-	 * This method is used for approach (a) (see above).
-	 */
-	@Override
-	public IterationRuntimeContext getIterationRuntimeContext() {
-		final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction;
-		return internalProducer.getIterationRuntimeContext();
-	}
-
-	/**
-	 * This method is used for approach (a) (see above).
-	 */
-	@Override
-	public void setRuntimeContext(RuntimeContext t) {
-		final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction;
-		internalProducer.setRuntimeContext(t);
-	}
-
-	/**
-	 * Invoke method for using the Sink as DataStream.addSink() sink.
-	 *
-	 * <p>This method is used for approach (a) (see above)
-	 *
-	 * @param value The input record.
-	 */
-	@Override
-	public void invoke(T value) throws Exception {
-		invokeInternal(value, Long.MAX_VALUE);
-	}
-
-	// ----------------- Helper methods and classes implementing methods from StreamSink (Approach (b)) ----
-
-	/**
-	 * Process method for using the sink with timestamp support.
-	 *
-	 * <p>This method is used for approach (b) (see above)
-	 */
-	@Override
-	public void processElement(StreamRecord<T> element) throws Exception {
-		invokeInternal(element.getValue(), element.getTimestamp());
-	}
-
-	@Override
-	public void initializeState(FunctionInitializationContext context) throws Exception {
-		final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction;
-		internalProducer.initializeState(context);
-	}
-
-	@Override
-	public void snapshotState(FunctionSnapshotContext context) throws Exception {
-		final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction;
-		internalProducer.snapshotState(context);
+		producer.send(record, callback);
 	}
 
 	/**
 	 * Configuration object returned by the writeToKafkaWithTimestamps() call.
+	 *
+	 * <p>This is only kept because it's part of the public API. It is not necessary anymore, now
+	 * that the {@link SinkFunction} interface provides timestamps.
 	 */
 	public static class FlinkKafkaProducer010Configuration<T> extends DataStreamSink<T> {
 
-		private final FlinkKafkaProducerBase wrappedProducerBase;
 		private final FlinkKafkaProducer010 producer;
 
-		private FlinkKafkaProducer010Configuration(DataStream stream, FlinkKafkaProducer010<T> producer) {
+		private FlinkKafkaProducer010Configuration(
+				DataStreamSink originalSink,
+				DataStream<T> inputStream,
+				FlinkKafkaProducer010<T> producer) {
 			//noinspection unchecked
-			super(stream, producer);
+			super(inputStream, originalSink.getTransformation().getOperator());
 			this.producer = producer;
-			this.wrappedProducerBase = (FlinkKafkaProducerBase) producer.userFunction;
 		}
 
 		/**
@@ -450,7 +344,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
 		 * @param logFailuresOnly The flag to indicate logging-only on exceptions.
 		 */
 		public void setLogFailuresOnly(boolean logFailuresOnly) {
-			this.wrappedProducerBase.setLogFailuresOnly(logFailuresOnly);
+			producer.setLogFailuresOnly(logFailuresOnly);
 		}
 
 		/**
@@ -461,7 +355,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
 		 * @param flush Flag indicating the flushing mode (true = flush on checkpoint)
 		 */
 		public void setFlushOnCheckpoint(boolean flush) {
-			this.wrappedProducerBase.setFlushOnCheckpoint(flush);
+			producer.setFlushOnCheckpoint(flush);
 		}
 
 		/**
@@ -471,7 +365,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
 		 * @param writeTimestampToKafka Flag indicating if Flink's internal timestamps are written to Kafka.
 		 */
 		public void setWriteTimestampToKafka(boolean writeTimestampToKafka) {
-			this.producer.writeTimestampToKafka = writeTimestampToKafka;
+			producer.writeTimestampToKafka = writeTimestampToKafka;
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6886f638/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
index 76a2f84..befc1a1 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
@@ -276,7 +276,7 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
 	 * 		The incoming data
 	 */
 	@Override
-	public void invoke(IN next) throws Exception {
+	public void invoke(IN next, Context context) throws Exception {
 		// propagate asynchronous errors
 		checkErroneous();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6886f638/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
index 000de52..35607dd 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
@@ -215,6 +215,9 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
 	 * This test sets KafkaProducer so that it will not automatically flush the data and
 	 * simulate network failure between Flink and Kafka to check whether FlinkKafkaProducer
 	 * flushed records manually on snapshotState.
+	 *
+	 * <p>Due to legacy reasons there are two different ways of instantiating a Kafka 0.10 sink. The
+	 * parameter controls which method is used.
 	 */
 	protected void testOneToOneAtLeastOnce(boolean regularSink) throws Exception {
 		final String topic = regularSink ? "oneToOneTopicRegularSink" : "oneToOneTopicCustomOperator";

http://git-wip-us.apache.org/repos/asf/flink/blob/6886f638/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkContextUtil.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkContextUtil.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkContextUtil.java
index 2749560..3b02ad0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkContextUtil.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkContextUtil.java
@@ -43,14 +43,9 @@ public class SinkContextUtil {
 			}
 
 			@Override
-			public long timestamp() {
+			public Long timestamp() {
 				return timestamp;
 			}
-
-			@Override
-			public boolean hasTimestamp() {
-				return true;
-			}
 		};
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6886f638/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java
index 15a77c4..74870bc 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java
@@ -31,22 +31,22 @@ import java.io.Serializable;
 public interface SinkFunction<IN> extends Function, Serializable {
 
 	/**
-	 * Function for standard sink behaviour. This function is called for every record.
-	 *
-	 * @param value The input record.
-	 * @throws Exception
 	 * @deprecated Use {@link #invoke(Object, Context)}.
 	 */
 	@Deprecated
-	default void invoke(IN value) throws Exception {
-	}
+	default void invoke(IN value) throws Exception {}
 
 	/**
 	 * Writes the given value to the sink. This function is called for every record.
 	 *
+	 * <p>You have to override this method when implementing a {@code SinkFunction}, this is a
+	 * {@code default} method for backward compatibility with the old-style method only.
+	 *
 	 * @param value The input record.
 	 * @param context Additional context about the input record.
-	 * @throws Exception
+	 *
+	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
+	 *                   to fail and may trigger recovery.
 	 */
 	default void invoke(IN value, Context context) throws Exception {
 		invoke(value);
@@ -72,15 +72,9 @@ public interface SinkFunction<IN> extends Function, Serializable {
 		long currentWatermark();
 
 		/**
-		 * Returns the timestamp of the current input record.
-		 */
-		long timestamp();
-
-		/**
-		 * Checks whether this record has a timestamp.
-		 *
-		 * @return True if the record has a timestamp, false if not.
+		 * Returns the timestamp of the current input record or {@code null} if the element does not
+		 * have an assigned timestamp.
 		 */
-		boolean hasTimestamp();
+		Long timestamp();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6886f638/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
index f4b09af..667e130 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
@@ -91,19 +91,11 @@ public class StreamSink<IN> extends AbstractUdfStreamOperator<Object, SinkFuncti
 		}
 
 		@Override
-		public long timestamp() {
-			if (!element.hasTimestamp()) {
-				throw new IllegalStateException(
-					"Record has no timestamp. Is the time characteristic set to 'ProcessingTime', or " +
-							"did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?");
-
+		public Long timestamp() {
+			if (element.hasTimestamp()) {
+				return element.getTimestamp();
 			}
-			return element.getTimestamp();
-		}
-
-		public boolean hasTimestamp() {
-			return element.hasTimestamp();
+			return null;
 		}
-
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6886f638/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamSinkOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamSinkOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamSinkOperatorTest.java
index 500a52a..9085ade 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamSinkOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamSinkOperatorTest.java
@@ -96,7 +96,8 @@ public class StreamSinkOperatorTest extends TestLogger {
 		@Override
 		public void invoke(
 			T value, Context context) throws Exception {
-			if (context.hasTimestamp()) {
+			Long timestamp = context.timestamp();
+			if (timestamp != null) {
 				data.add(
 					new Tuple4<>(
 						context.currentWatermark(),